GH-238 Added initial support for RoutingFunction

- Added initial implementation of RoutingFunction which is bootstrapped optionally based on setting ‘spring.cloud.function.routing.enabled’ property to true.
- Added initial documentation and tests

Resolves #238
This commit is contained in:
Oleg Zhurakousky
2019-05-03 23:15:40 +02:00
parent abeb652830
commit 4d9cdb9750
15 changed files with 557 additions and 187 deletions

View File

@@ -38,7 +38,9 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxedConsumer;
@@ -77,6 +79,8 @@ public class RequestProcessor {
private final FunctionInspector inspector;
private final FunctionCatalog functionCatalog;
private final StringConverter converter;
private final JsonMapper mapper;
@@ -84,10 +88,12 @@ public class RequestProcessor {
private final List<HttpMessageReader<?>> messageReaders;
public RequestProcessor(FunctionInspector inspector,
FunctionCatalog functionCatalog,
ObjectProvider<JsonMapper> mapper, StringConverter converter,
ObjectProvider<ServerCodecConfigurer> codecs) {
this.mapper = mapper.getIfAvailable();
this.inspector = inspector;
this.functionCatalog = functionCatalog;
this.converter = converter;
ServerCodecConfigurer source = codecs.getIfAvailable();
this.messageReaders = source == null ? null : source.getReaders();
@@ -97,23 +103,23 @@ public class RequestProcessor {
Function<? extends Publisher<?>, ? extends Publisher<?>> function,
Consumer<? extends Publisher<?>> consumer,
Supplier<? extends Publisher<?>> supplier) {
return new FunctionWrapper(function, consumer, supplier);
return new FunctionWrapper(function, supplier);
}
public static FunctionWrapper wrapper(
Function<? extends Publisher<?>, ? extends Publisher<?>> function) {
return new FunctionWrapper(function, null, null);
return new FunctionWrapper(function, null);
}
public Mono<ResponseEntity<?>> get(FunctionWrapper wrapper) {
if (wrapper.function() != null) {
return response(wrapper, wrapper.function(),
value(wrapper.function(), wrapper.argument()), true, true);
return response(wrapper, wrapper.function(), value(wrapper), true, true);
}
else {
return response(wrapper, wrapper.supplier(), wrapper.supplier().get(), null,
true);
}
}
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper,
@@ -130,7 +136,8 @@ public class RequestProcessor {
Object input = body == null && inputType.isAssignableFrom(String.class) ? "" : body;
if (input != null) {
if ((isInputMultiple(this.getTargetIfRouting(wrapper, function)) || !(function instanceof RoutingFunction))
&& input != null) { // TODO rework. . . pretty ugly
if (this.shouldUseJsonConversion((String) input, wrapper.headers.getContentType())) {
Type jsonType = body.startsWith("[")
&& Collection.class.isAssignableFrom(inputType)
@@ -149,28 +156,53 @@ public class RequestProcessor {
return response(wrapper, input, stream);
}
public Mono<ResponseEntity<?>> stream(FunctionWrapper request) {
Publisher<?> result = request.function() != null
? value(request)
: request.supplier().get();
return stream(request, result);
}
private boolean shouldUseJsonConversion(String body, MediaType contentType) {
return (body.startsWith("[") || body.startsWith("{"))
&& (contentType == null || (contentType != null
&& !"text".equalsIgnoreCase(contentType.getType())));
}
public Mono<ResponseEntity<?>> stream(FunctionWrapper request) {
Publisher<?> result = request.function() != null
? value(request.function(), request.argument())
: request.supplier().get();
return stream(request, result);
}
private List<HttpMessageReader<?>> getMessageReaders() {
return this.messageReaders;
}
private Mono<ResponseEntity<?>> response(FunctionWrapper request, Object handler,
Publisher<?> result, Boolean single, boolean getter) {
BodyBuilder builder = ResponseEntity.ok();
if (this.inspector.isMessage(handler)) {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(handler, message))
.doOnNext(value -> addHeaders(builder, value))
.map(message -> message.getPayload());
}
else {
builder.headers(HeaderUtils.sanitize(request.headers()));
}
if (isOutputSingle(handler)
&& (single != null && single || getter || isInputMultiple(handler))) {
result = Mono.from(result);
}
if (result instanceof Flux) {
result = Flux.from(result).collectList();
}
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private Mono<ResponseEntity<?>> response(FunctionWrapper wrapper, Object body,
boolean stream) {
Function<Publisher<?>, Publisher<?>> function = wrapper.function();
Consumer<Publisher<?>> consumer = wrapper.consumer();
Function function = wrapper.function();
Flux<?> flux;
if (body != null) {
@@ -197,7 +229,7 @@ public class RequestProcessor {
}
if (this.inspector.isMessage(function)) {
flux = messages(wrapper, function == null ? consumer : function, flux);
flux = messages(wrapper, function, flux);
}
Mono<ResponseEntity<?>> responseEntityMono = null;
@@ -208,19 +240,33 @@ public class RequestProcessor {
.just(ResponseEntity.status(HttpStatus.ACCEPTED).build());
}
else {
Flux<?> result = Flux.from(function.apply(flux));
Flux<?> result = Flux.from((Publisher) function.apply(flux));
logger.debug("Handled POST with function");
if (stream) {
responseEntityMono = stream(wrapper, result);
}
else {
responseEntityMono = response(wrapper, function, result,
responseEntityMono = response(wrapper, getTargetIfRouting(wrapper, function), result,
body == null ? null : !(body instanceof Collection), false);
}
}
return responseEntityMono;
}
/*
* Called when building response and returns the actual
* target function in case the current function is RoutingFunction.
* This is necessary to determine the type of the output (e.g., Flux =
* multiple or Mono = single etc). See isOutputSingle(..).
*/
private Object getTargetIfRouting(FunctionWrapper wrapper, Object function) {
if (function instanceof RoutingFunction) {
String name = wrapper.headers.get("function.name").iterator().next();
function = this.functionCatalog.lookup(name);
}
return function;
}
private Flux<?> messages(FunctionWrapper request, Object function, Flux<?> flux) {
Map<String, Object> headers = HeaderUtils.fromHttp(request.headers());
return flux.map(payload -> MessageUtils.create(function, payload, headers));
@@ -246,30 +292,7 @@ public class RequestProcessor {
return Flux.from(output).then(Mono.fromSupplier(() -> builder.body(output)));
}
private Mono<ResponseEntity<?>> response(FunctionWrapper request, Object handler,
Publisher<?> result, Boolean single, boolean getter) {
BodyBuilder builder = ResponseEntity.ok();
if (this.inspector.isMessage(handler)) {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(handler, message))
.doOnNext(value -> addHeaders(builder, value))
.map(message -> message.getPayload());
}
else {
builder.headers(HeaderUtils.sanitize(request.headers()));
}
if (isOutputSingle(handler)
&& (single != null && single || getter || isInputMultiple(handler))) {
result = Mono.from(result);
}
if (result instanceof Flux) {
result = Flux.from(result).collectList();
}
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
}
private boolean isInputMultiple(Object handler) {
Class<?> type = this.inspector.getInputType(handler);
@@ -373,11 +396,13 @@ public class RequestProcessor {
return ReactiveAdapterRegistry.getSharedInstance();
}
private Publisher<?> value(Function<Publisher<?>, Publisher<?>> function,
Publisher<String> value) {
Flux<?> input = Flux.from(value)
.map(body -> this.converter.convert(function, body));
return Mono.from(function.apply(input));
private Publisher<?> value(FunctionWrapper wrapper) {
Flux<?> input = Flux.from(wrapper.argument)
.map(body -> this.converter.convert(wrapper.function, body));
if (this.inspector.isMessage(wrapper.function)) {
input = messages(wrapper, wrapper.function, input);
}
return Mono.from(wrapper.function.apply(input));
}
private Type getItemType(Object function) {
@@ -413,8 +438,6 @@ public class RequestProcessor {
private final Function<Publisher<?>, Publisher<?>> function;
private final Consumer<Publisher<?>> consumer;
private final Supplier<Publisher<?>> supplier;
private final MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
@@ -426,26 +449,21 @@ public class RequestProcessor {
@SuppressWarnings("unchecked")
public FunctionWrapper(
Function<? extends Publisher<?>, ? extends Publisher<?>> function,
Consumer<? extends Publisher<?>> consumer,
Supplier<? extends Publisher<?>> supplier) {
this.function = (Function<Publisher<?>, Publisher<?>>) function;
this.consumer = (Consumer<Publisher<?>>) consumer;
this.supplier = (Supplier<Publisher<?>>) supplier;
}
public Object handler() {
return this.function != null ? this.function
: this.consumer != null ? this.consumer : this.supplier;
return this.function != null
? this.function
: this.supplier;
}
public Function<Publisher<?>, Publisher<?>> function() {
return this.function;
}
public Consumer<Publisher<?>> consumer() {
return this.consumer;
}
public Supplier<Publisher<?>> supplier() {
return this.supplier;
}

View File

@@ -101,6 +101,7 @@ class FunctionEndpointInitializer implements ApplicationContextInitializer<Gener
() -> new BasicStringConverter(context.getBean(FunctionInspector.class), context.getBeanFactory()));
context.registerBean(RequestProcessor.class,
() -> new RequestProcessor(context.getBean(FunctionInspector.class),
context.getBean(FunctionCatalog.class),
context.getBeanProvider(JsonMapper.class), context.getBean(StringConverter.class),
context.getBeanProvider(ServerCodecConfigurer.class)));
context.registerBean(FunctionEndpointFactory.class,

View File

@@ -16,21 +16,19 @@
package org.springframework.cloud.function.web.mvc;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.HashMap;
import javax.servlet.http.HttpServletRequest;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.cloud.function.web.util.FunctionWebUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.util.StringUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerMapping;
@@ -38,7 +36,7 @@ import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandl
/**
* @author Dave Syer
*
* @author Oleg Zhurakousky
*/
@Configuration
@ConditionalOnClass(RequestMappingHandlerMapping.class)
@@ -92,7 +90,9 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
if (path.startsWith(this.prefix)) {
path = path.substring(this.prefix.length());
}
Object function = findFunctionForGet(request, path);
Object function = FunctionWebUtils.findFunction(HttpMethod.resolve(request.getMethod()),
this.functions, new HttpRequestAttributeDelegate(request), path);
if (function != null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Found function for GET: " + path);
@@ -100,65 +100,20 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
request.setAttribute(WebRequestConstants.HANDLER, function);
return handler;
}
function = findFunctionForPost(request, path);
if (function != null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Found function for POST: " + path);
}
request.setAttribute(WebRequestConstants.HANDLER, function);
return handler;
}
return null;
}
private Object findFunctionForPost(HttpServletRequest request, String path) {
if (!request.getMethod().equals("POST")) {
return null;
@SuppressWarnings("serial")
private static class HttpRequestAttributeDelegate extends HashMap<String, Object> {
private final HttpServletRequest request;
HttpRequestAttributeDelegate(HttpServletRequest request) {
this.request = request;
}
path = path.startsWith("/") ? path.substring(1) : path;
Consumer<Publisher<?>> consumer = this.functions.lookup(Consumer.class, path);
if (consumer != null) {
request.setAttribute(WebRequestConstants.CONSUMER, consumer);
return consumer;
}
Function<Object, Object> function = this.functions.lookup(Function.class, path);
if (function != null) {
request.setAttribute(WebRequestConstants.FUNCTION, function);
return function;
}
return null;
}
private Object findFunctionForGet(HttpServletRequest request, String path) {
if (!request.getMethod().equals("GET")) {
return null;
public Object put(String key, Object value) {
this.request.setAttribute(key, value);
return value;
}
path = path.startsWith("/") ? path.substring(1) : path;
Supplier<Publisher<?>> supplier = this.functions.lookup(Supplier.class, path);
if (supplier != null) {
request.setAttribute(WebRequestConstants.SUPPLIER, supplier);
return supplier;
}
StringBuilder builder = new StringBuilder();
String name = path;
String value = null;
for (String element : path.split("/")) {
if (builder.length() > 0) {
builder.append("/");
}
builder.append(element);
name = builder.toString();
value = path.length() > name.length() ? path.substring(name.length() + 1)
: null;
Function<Object, Object> function = this.functions.lookup(Function.class,
name);
if (function != null) {
request.setAttribute(WebRequestConstants.FUNCTION, function);
request.setAttribute(WebRequestConstants.ARGUMENT, value);
return function;
}
}
return null;
}
}

View File

@@ -17,7 +17,6 @@
package org.springframework.cloud.function.web.util;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -36,66 +35,45 @@ public final class FunctionWebUtils {
public static Object findFunction(HttpMethod method, FunctionCatalog functionCatalog,
Map<String, Object> attributes, String path) {
if (method.equals(HttpMethod.GET)) {
return findFunctionForGet(functionCatalog, attributes, path);
}
else if (method.equals(HttpMethod.POST)) {
return findFunctionForPost(functionCatalog, attributes, path);
if (method.equals(HttpMethod.GET) || method.equals(HttpMethod.POST)) {
return doFindFunction(method, functionCatalog, attributes, path);
}
else {
throw new IllegalStateException("HTTP method '" + method + "' is not supported;");
}
}
private static Object findFunctionForGet(FunctionCatalog functionCatalog,
private static Object doFindFunction(HttpMethod method, FunctionCatalog functionCatalog,
Map<String, Object> attributes, String path) {
path = path.startsWith("/") ? path.substring(1) : path;
Object functionForGet = null;
Supplier<Publisher<?>> supplier = functionCatalog.lookup(Supplier.class, path);
if (supplier != null) {
attributes.put(WebRequestConstants.SUPPLIER, supplier);
functionForGet = supplier;
}
else {
StringBuilder builder = new StringBuilder();
String name = path;
String[] splitPath = path.split("/");
Function<Object, Object> function = null;
for (int i = 0; i < splitPath.length && functionForGet == null; i++) {
String element = splitPath[i];
if (builder.length() > 0) {
builder.append("/");
}
builder.append(element);
name = builder.toString();
function = functionCatalog.lookup(Function.class, name);
if (function != null) {
attributes.put(WebRequestConstants.FUNCTION, function);
String value = path.length() > name.length()
? path.substring(name.length() + 1) : null;
attributes.put(WebRequestConstants.ARGUMENT, value);
functionForGet = function;
}
if (method.equals(HttpMethod.GET)) {
Supplier<Publisher<?>> supplier = functionCatalog.lookup(Supplier.class, path);
if (supplier != null) {
attributes.put(WebRequestConstants.SUPPLIER, supplier);
return supplier;
}
}
return functionForGet;
}
private static Object findFunctionForPost(FunctionCatalog functionCatalog,
Map<String, Object> attributes, String path) {
path = path.startsWith("/") ? path.substring(1) : path;
Consumer<Publisher<?>> consumer = functionCatalog.lookup(Consumer.class, path);
if (consumer != null) {
attributes.put(WebRequestConstants.CONSUMER, consumer);
return consumer;
}
Function<Object, Object> function = functionCatalog.lookup(Function.class, path);
if (function != null) {
attributes.put(WebRequestConstants.FUNCTION, function);
return function;
StringBuilder builder = new StringBuilder();
String name = path;
String value = null;
for (String element : path.split("/")) {
if (builder.length() > 0) {
builder.append("/");
}
builder.append(element);
name = builder.toString();
value = path.length() > name.length() ? path.substring(name.length() + 1)
: null;
Function<Object, Object> function = functionCatalog.lookup(Function.class,
name);
if (function != null) {
attributes.put(WebRequestConstants.FUNCTION, function);
if (value != null) {
attributes.put(WebRequestConstants.ARGUMENT, value);
}
return function;
}
}
return null;
}

View File

@@ -83,6 +83,9 @@ public final class HeaderUtils {
name = name.toLowerCase();
Object value = values == null ? null
: (values.size() == 1 ? values.iterator().next() : values);
if (name.toLowerCase().equals(HttpHeaders.CONTENT_TYPE.toLowerCase())) {
name = MessageHeaders.CONTENT_TYPE;
}
map.put(name, value);
}
return new MessageHeaders(map);

View File

@@ -0,0 +1,246 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.mvc;
import java.net.URI;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.web.RestApplication;
import org.springframework.cloud.function.web.mvc.RoutingFunctionTests.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Oleg Zhurakousky
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
"spring.main.web-application-type=servlet",
"spring.cloud.function.web.path=/functions",
"spring.cloud.function.routing.enabled=true"})
@ContextConfiguration(classes = { RestApplication.class, TestConfiguration.class })
public class RoutingFunctionTests {
@Autowired
private TestRestTemplate rest;
@Test
public void testFunctionMessage() throws Exception {
HttpEntity<String> postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.APPLICATION_JSON)
.header("function.name", "employee")
.body("{\"name\":\"Bob\",\"age\":25}"), String.class);
assertThat(postForEntity.getBody()).isEqualTo("{\"name\":\"Bob\",\"age\":25}");
assertThat(postForEntity.getHeaders().containsKey("x-content-type")).isTrue();
assertThat(postForEntity.getHeaders().get("x-content-type").get(0))
.isEqualTo("application/xml");
assertThat(postForEntity.getHeaders().get("foo").get(0)).isEqualTo("bar");
}
@Test
public void testFunctionPrimitive() throws Exception {
ResponseEntity<String> postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.TEXT_PLAIN)
.header("function.name", "echo")
.body("{\"name\":\"Bob\",\"age\":25}"), String.class);
assertThat(postForEntity.getBody()).isEqualTo("{\"name\":\"Bob\",\"age\":25}");
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@Test
public void testFluxFunctionPrimitive() throws Exception {
ResponseEntity<String> postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.TEXT_PLAIN)
.header("function.name", "fluxuppercase")
.body("hello"), String.class);
assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO\"]");
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.TEXT_PLAIN)
.header("function.name", "fluxuppercase")
.body("hello1"), String.class);
assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO1\"]");
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.TEXT_PLAIN)
.header("function.name", "fluxuppercase")
.body("hello2"), String.class);
assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO2\"]");
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@Test
public void testFluxFunctionPrimitiveArray() throws Exception {
ResponseEntity<String> postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.APPLICATION_JSON)
.header("function.name", "fluxuppercase")
.body(new String[] {"a", "b", "c"}), String.class);
assertThat(postForEntity.getBody()).isEqualTo("[\"A\",\"B\",\"C\"]");
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@Test
public void testFluxConsumer() throws Exception {
ResponseEntity<String> postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.APPLICATION_JSON)
.header("function.name", "fluxconsumer")
.body(new String[] {"a", "b", "c"}), String.class);
//assertThat(postForEntity.getBody()).isEqualTo("[\"A\",\"B\",\"C\"]");
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@Test
public void testFunctionPojo() throws Exception {
ResponseEntity<String> postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.APPLICATION_JSON)
.header("function.name", "echoPojo")
.body("{\"value\":\"foo\"}"), String.class);
assertThat(postForEntity.getBody()).isEqualTo("{\"foo\":{\"value\":\"foo\"},\"value\":\"bar\"}");
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@Test
public void testConsumerMessage() throws Exception {
ResponseEntity<String> postForEntity = this.rest
.exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME))
.contentType(MediaType.TEXT_PLAIN)
.header("function.name", "messageConsumer")
.body("{\"name\":\"Bob\",\"age\":25}"), String.class);
assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@EnableAutoConfiguration
@org.springframework.boot.test.context.TestConfiguration
protected static class TestConfiguration {
@Bean({ "employee" })
public Function<Message<Map<String, Object>>, Message<Map<String, Object>>> function() {
return request -> {
Message<Map<String, Object>> message = MessageBuilder
.withPayload(request.getPayload())
.setHeader("X-Content-Type", "application/xml")
.setHeader("foo", "bar").build();
return message;
};
}
@Bean
public Consumer<Message<String>> messageConsumer() {
return value -> System.out.println("Value: " + value);
}
@Bean
public Function<String, String> echo() {
return v -> v;
}
@Bean
public Function<Flux<String>, Flux<String>> fluxuppercase() {
return v -> v.map(s -> {
System.out.println(s);
return s.toUpperCase();
});
}
@Bean
public Consumer<Flux<String>> fluxconsumer() {
// return v -> v.map(value -> {
// return value.toUpperCase();
// });
return flux -> flux.doOnNext(s -> {
System.out.println(s + " jkljlkjlkj l");
}).subscribe();
}
@Bean
public Function<Foo, Bar> echoPojo() {
return v -> {
Bar bar = new Bar();
bar.setFoo(v);
bar.setValue("bar");
return bar;
};
}
}
public static class Foo {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class Bar {
private Foo foo;
private String value;
public Foo getFoo() {
return foo;
}
public void setFoo(Foo foo) {
this.foo = foo;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}