GH-499,498 Add support for SupplierExporter to control output content-type
- Add 'contentType' property to ExporterProperties to assist SupplierExporter with delegating it to function catalog - Add additional logging and testing - Change JsonMapper to abstract class providing special handling of conversion of Json Sting to byte[]
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2019 the original author or authors.
|
||||
* Copyright 2012-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -127,8 +127,10 @@ public class RequestProcessor {
|
||||
|
||||
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper,
|
||||
ServerWebExchange exchange) {
|
||||
return Mono.from(body(wrapper.handler(), exchange))
|
||||
Mono<ResponseEntity<?>> responseEntity = Mono.from(body(wrapper.handler(), exchange))
|
||||
.flatMap(body -> response(wrapper, body, false));
|
||||
|
||||
return responseEntity;
|
||||
}
|
||||
|
||||
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, String body,
|
||||
@@ -149,7 +151,7 @@ public class RequestProcessor {
|
||||
jsonType = ResolvableType.forClassWithGenerics((Class<?>) jsonType,
|
||||
(Class<?>) itemType).getType();
|
||||
}
|
||||
input = this.mapper.toObject((String) input, jsonType);
|
||||
input = this.mapper.fromJson((String) input, jsonType);
|
||||
}
|
||||
else {
|
||||
input = this.converter.convert(function, (String) input);
|
||||
@@ -178,7 +180,6 @@ public class RequestProcessor {
|
||||
|
||||
private Mono<ResponseEntity<?>> response(FunctionWrapper request, Object handler,
|
||||
Publisher<?> result, Boolean single, boolean getter) {
|
||||
|
||||
BodyBuilder builder = ResponseEntity.ok();
|
||||
if (this.inspector.isMessage(handler)) {
|
||||
result = Flux.from(result)
|
||||
@@ -389,7 +390,11 @@ public class RequestProcessor {
|
||||
exchange.getLogPrefix() + "0..1 [" + elementType + "]");
|
||||
}
|
||||
Mono<?> mono = reader.readMono(actualType, elementType, request,
|
||||
response, readHints);
|
||||
response, readHints).doOnNext(v -> {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("received: " + v);
|
||||
}
|
||||
});
|
||||
mono = mono.onErrorResume(
|
||||
ex -> Mono.error(handleReadError(bodyParam, ex)));
|
||||
if (isBodyRequired) {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2019 the original author or authors.
|
||||
* Copyright 2012-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.springframework.cloud.function.context.FunctionProperties;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
*/
|
||||
@ConfigurationProperties(prefix = FunctionProperties.PREFIX + ".web.export")
|
||||
@@ -149,6 +150,11 @@ public class ExporterProperties {
|
||||
*/
|
||||
private String name;
|
||||
|
||||
/**
|
||||
* Content type to use when serializing source's output for transport (default 'application/json`).
|
||||
*/
|
||||
private String contentType = "application/json";
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
@@ -169,6 +175,13 @@ public class ExporterProperties {
|
||||
return this.headers;
|
||||
}
|
||||
|
||||
public String getContentType() {
|
||||
return contentType;
|
||||
}
|
||||
|
||||
public void setContentType(String contentType) {
|
||||
this.contentType = contentType;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2019 the original author or authors.
|
||||
* Copyright 2012-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -58,6 +58,8 @@ public class SupplierExporter implements SmartLifecycle {
|
||||
|
||||
private final String supplier;
|
||||
|
||||
private final String contentType;
|
||||
|
||||
private volatile boolean running;
|
||||
|
||||
private volatile boolean ok = true;
|
||||
@@ -70,14 +72,15 @@ public class SupplierExporter implements SmartLifecycle {
|
||||
|
||||
SupplierExporter(RequestBuilder requestBuilder,
|
||||
DestinationResolver destinationResolver, FunctionCatalog catalog,
|
||||
WebClient client, ExporterProperties props) {
|
||||
WebClient client, ExporterProperties exporterProperties) {
|
||||
this.requestBuilder = requestBuilder;
|
||||
this.destinationResolver = destinationResolver;
|
||||
this.catalog = catalog;
|
||||
this.client = client;
|
||||
this.debug = props.isDebug();
|
||||
this.autoStartup = props.isAutoStartup();
|
||||
this.supplier = props.getSink().getName();
|
||||
this.debug = exporterProperties.isDebug();
|
||||
this.autoStartup = exporterProperties.isAutoStartup();
|
||||
this.supplier = exporterProperties.getSink().getName();
|
||||
this.contentType = exporterProperties.getSink().getContentType();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -93,7 +96,7 @@ public class SupplierExporter implements SmartLifecycle {
|
||||
|
||||
boolean suppliersPresent = false;
|
||||
for (String name : names) {
|
||||
Supplier<Publisher<Object>> supplier = this.catalog.lookup(Supplier.class, name);
|
||||
Supplier<Publisher<Object>> supplier = this.catalog.lookup(name, this.contentType);
|
||||
if (supplier == null) {
|
||||
logger.warn("No such Supplier: " + name);
|
||||
continue;
|
||||
@@ -163,8 +166,7 @@ public class SupplierExporter implements SmartLifecycle {
|
||||
|
||||
private Flux<ClientResponse> forward(Supplier<Publisher<Object>> supplier, String name) {
|
||||
return Flux.from(supplier.get()).flatMap(value -> {
|
||||
String destination = this.destinationResolver.destination(supplier, name,
|
||||
value);
|
||||
String destination = this.destinationResolver.destination(supplier, name, value);
|
||||
if (this.debug) {
|
||||
logger.info("Posting to: " + destination);
|
||||
}
|
||||
@@ -178,9 +180,17 @@ public class SupplierExporter implements SmartLifecycle {
|
||||
Message<?> message = (Message<?>) value;
|
||||
body = message.getPayload();
|
||||
}
|
||||
if (this.debug) {
|
||||
logger.debug("Sending BODY as type: " + body.getClass().getName());
|
||||
}
|
||||
Mono<ClientResponse> result = this.client.post().uri(uri)
|
||||
.headers(headers -> headers(headers, destination, value)).bodyValue(body)
|
||||
.exchange();
|
||||
.exchange()
|
||||
.doOnNext(response -> {
|
||||
if (this.debug) {
|
||||
logger.debug("Response STATUS: " + response.statusCode());
|
||||
}
|
||||
});
|
||||
if (this.debug) {
|
||||
result = result.log();
|
||||
}
|
||||
|
||||
@@ -52,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
"spring.cloud.function.web.export.sink.url=http://localhost:${my.port}",
|
||||
"spring.cloud.function.web.export.source.url=http://localhost:${my.port}",
|
||||
"spring.cloud.function.web.export.sink.name=origin|uppercase",
|
||||
"spring.cloud.function.web.export.sink.contentType=text/plain",
|
||||
"spring.cloud.function.web.export.debug=true" })
|
||||
public class FunctionalExporterTests {
|
||||
|
||||
|
||||
@@ -85,8 +85,8 @@ public class FunctionAutoConfigurationIntegrationTests {
|
||||
}
|
||||
// It completed
|
||||
assertThat(this.forwarder.isOk()).isTrue();
|
||||
assertThat(this.app.inputs).contains("HELLO");
|
||||
assertThat(this.app.inputs).contains("WORLD");
|
||||
assertThat(this.app.inputs).contains("\"HELLO\"");
|
||||
assertThat(this.app.inputs).contains("\"WORLD\"");
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
|
||||
Reference in New Issue
Block a user