@@ -16,19 +16,13 @@
|
||||
|
||||
package org.springframework.cloud.function.web.flux;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import org.springframework.cloud.function.context.catalog.FunctionInspector;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.cloud.function.web.flux.constants.WebRequestConstants;
|
||||
import org.springframework.cloud.function.web.flux.request.FluxFormRequest;
|
||||
import org.springframework.cloud.function.web.flux.request.FluxRequest;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
@@ -38,10 +32,15 @@ import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.context.request.WebRequest;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
* @author Mark Fisher
|
||||
@@ -68,18 +67,21 @@ public class FunctionController {
|
||||
|
||||
@PostMapping(path = "/**")
|
||||
@ResponseBody
|
||||
public ResponseEntity<Publisher<?>> post(WebRequest request,
|
||||
@RequestBody FluxRequest<?> body) {
|
||||
public ResponseEntity<Publisher<?>> post(WebRequest request, @RequestBody FluxRequest<?> body) {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Function<Flux<?>, Flux<?>> function = (Function<Flux<?>, Flux<?>>) request
|
||||
.getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST);
|
||||
@SuppressWarnings("unchecked")
|
||||
Consumer<Flux<?>> consumer = (Consumer<Flux<?>>) request
|
||||
.getAttribute(WebRequestConstants.CONSUMER, WebRequest.SCOPE_REQUEST);
|
||||
Boolean single = (Boolean) request.getAttribute(WebRequestConstants.INPUT_SINGLE,
|
||||
WebRequest.SCOPE_REQUEST);
|
||||
Boolean single = (Boolean) request
|
||||
.getAttribute(WebRequestConstants.INPUT_SINGLE, WebRequest.SCOPE_REQUEST);
|
||||
|
||||
FluxFormRequest form = FluxFormRequest.from(request.getParameterMap());
|
||||
|
||||
if (function != null) {
|
||||
Flux<?> flux = body.flux();
|
||||
Flux<?> flux = body.body() == null ? form.flux() : body.flux();
|
||||
if (debug) {
|
||||
flux = flux.log();
|
||||
}
|
||||
@@ -90,11 +92,11 @@ public class FunctionController {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Handled POST with function");
|
||||
}
|
||||
return ResponseEntity.ok().body(
|
||||
debug ? result.log() : response(request, function, single, result));
|
||||
return ResponseEntity.ok().body(debug ? result.log() : response(request, function, single, result));
|
||||
}
|
||||
|
||||
if (consumer != null) {
|
||||
Flux<?> flux = body.flux().cache(); // send a copy back to the caller
|
||||
Flux<?> flux = body.body() == null ? form.flux().cache() : body.flux().cache(); // send a copy back to the caller
|
||||
if (debug) {
|
||||
flux = flux.log();
|
||||
}
|
||||
@@ -104,18 +106,19 @@ public class FunctionController {
|
||||
}
|
||||
return ResponseEntity.status(HttpStatus.ACCEPTED).body(flux);
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("no such function");
|
||||
}
|
||||
|
||||
private Publisher<?> response(WebRequest request, Object handler, Boolean single,
|
||||
Flux<?> result) {
|
||||
private Publisher<?> response(WebRequest request, Object handler, Boolean single, Flux<?> result) {
|
||||
|
||||
if (single != null && single && isOutputSingle(handler)) {
|
||||
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true,
|
||||
WebRequest.SCOPE_REQUEST);
|
||||
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true, WebRequest.SCOPE_REQUEST);
|
||||
return Mono.from(result);
|
||||
}
|
||||
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, false,
|
||||
WebRequest.SCOPE_REQUEST);
|
||||
|
||||
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, false, WebRequest.SCOPE_REQUEST);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -128,10 +131,7 @@ public class FunctionController {
|
||||
if (wrapper == type) {
|
||||
return true;
|
||||
}
|
||||
if (Mono.class.equals(wrapper) || Optional.class.equals(wrapper)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return Mono.class.equals(wrapper) || Optional.class.equals(wrapper);
|
||||
}
|
||||
|
||||
@GetMapping(path = "/**")
|
||||
@@ -143,8 +143,9 @@ public class FunctionController {
|
||||
@SuppressWarnings("unchecked")
|
||||
Supplier<Flux<?>> supplier = (Supplier<Flux<?>>) request
|
||||
.getAttribute(WebRequestConstants.SUPPLIER, WebRequest.SCOPE_REQUEST);
|
||||
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT,
|
||||
WebRequest.SCOPE_REQUEST);
|
||||
String argument = (String) request
|
||||
.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST);
|
||||
|
||||
if (function != null) {
|
||||
return value(function, argument);
|
||||
}
|
||||
|
||||
@@ -18,9 +18,8 @@ package org.springframework.cloud.function.web.flux.constants;
|
||||
|
||||
/**
|
||||
* Common storage for web request attribute names (in a separate package to avoid cycles).
|
||||
*
|
||||
* @author Dave Syer
|
||||
*
|
||||
* @author Dave Syer
|
||||
*/
|
||||
public abstract class WebRequestConstants {
|
||||
|
||||
@@ -32,7 +31,8 @@ public abstract class WebRequestConstants {
|
||||
+ ".supplier";
|
||||
public static final String ARGUMENT = WebRequestConstants.class.getName()
|
||||
+ ".argument";
|
||||
public static final String HANDLER = WebRequestConstants.class.getName() + ".handler";
|
||||
public static final String HANDLER = WebRequestConstants.class.getName()
|
||||
+ ".handler";
|
||||
public static final String INPUT_SINGLE = WebRequestConstants.class.getName()
|
||||
+ ".input_single";
|
||||
public static final String OUTPUT_SINGLE = WebRequestConstants.class.getName()
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package org.springframework.cloud.function.web.flux.request;
|
||||
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
public class FluxFormRequest<K, V> {
|
||||
|
||||
private Map<K, V[]> map;
|
||||
|
||||
public FluxFormRequest(Map<K, V[]> map) {
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
public static <K, V> FluxFormRequest<K, V> from(Map<K, V[]> map) {
|
||||
return new FluxFormRequest<>(map);
|
||||
}
|
||||
|
||||
public Flux<MultiValueMap<K, V>> flux() {
|
||||
return Flux.just(buildMap());
|
||||
}
|
||||
|
||||
public MultiValueMap<K, V> body() {
|
||||
return buildMap();
|
||||
}
|
||||
|
||||
private MultiValueMap<K, V> buildMap() {
|
||||
|
||||
if (map == null)
|
||||
return null;
|
||||
|
||||
MultiValueMap<K, V> result = new LinkedMultiValueMap<>();
|
||||
map.forEach((key, values) -> result.put(key, Arrays.asList(values)));
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -15,23 +15,10 @@
|
||||
*/
|
||||
package org.springframework.cloud.function.web;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
@@ -48,15 +35,23 @@ import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.StringUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
*
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
|
||||
@@ -76,7 +71,7 @@ public class RestApplicationTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void staticResource() throws Exception {
|
||||
public void staticResource() {
|
||||
assertThat(rest.getForObject("/test.html", String.class)).contains("<body>Test");
|
||||
}
|
||||
|
||||
@@ -224,7 +219,7 @@ public class RestApplicationTests {
|
||||
assertThat(rest.exchange(
|
||||
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
|
||||
String.class).getBody())
|
||||
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
|
||||
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -415,7 +410,7 @@ public class RestApplicationTests {
|
||||
// The new line in the middle is optional
|
||||
.body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"),
|
||||
String.class).getBody())
|
||||
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
|
||||
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -423,7 +418,21 @@ public class RestApplicationTests {
|
||||
assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase"))
|
||||
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
|
||||
.body("[\"foo\",\"bar\"]"), String.class).getBody())
|
||||
.isEqualTo(sse("(FOO)", "(BAR)"));
|
||||
.isEqualTo(sse("(FOO)", "(BAR)"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sum() throws Exception {
|
||||
|
||||
LinkedMultiValueMap<String, String> map = new LinkedMultiValueMap<>();
|
||||
|
||||
map.put("A", Arrays.asList("1", "2", "3"));
|
||||
map.put("B", Arrays.asList("5", "6"));
|
||||
|
||||
assertThat(rest.exchange(RequestEntity.post(new URI("/sum"))
|
||||
.accept(MediaType.APPLICATION_JSON).contentType(MediaType.MULTIPART_FORM_DATA)
|
||||
.body(map), String.class).getBody())
|
||||
.isEqualTo("[{\"A\":6,\"B\":11}]");
|
||||
}
|
||||
|
||||
private String sse(String... values) {
|
||||
@@ -575,6 +584,20 @@ public class RestApplicationTests {
|
||||
Arrays.asList("come", "back"));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<MultiValueMap<String, String>, Map<String, Integer>> sum() {
|
||||
return valueMap -> valueMap
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(
|
||||
Collectors
|
||||
.toMap(
|
||||
Map.Entry::getKey,
|
||||
values -> values.getValue().stream().mapToInt(Integer::parseInt).sum()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Foo {
|
||||
|
||||
Reference in New Issue
Block a user