Double check that a Function can return a Mono
This commit is contained in:
@@ -149,12 +149,25 @@ public class FunctionType {
|
||||
.getType());
|
||||
}
|
||||
|
||||
public FunctionType wrap(Class<?> wrapper) {
|
||||
if (wrapper.isAssignableFrom(getInputWrapper()) || !isWrapper(wrapper)) {
|
||||
public FunctionType wrap(Class<?> input, Class<?> output) {
|
||||
if (!isWrapper(input) && !isWrapper(output)) {
|
||||
return this;
|
||||
}
|
||||
if (!isWrapper(input) || !isWrapper(output)) {
|
||||
throw new IllegalArgumentException("Both wrapper types must be wrappers in ("
|
||||
+ input + ", " + output + ")");
|
||||
}
|
||||
if (input.isAssignableFrom(getInputWrapper())
|
||||
&& output.isAssignableFrom(getOutputWrapper())) {
|
||||
return this;
|
||||
}
|
||||
return new FunctionType(ResolvableType.forClassWithGenerics(Function.class,
|
||||
wrap(wrapper, getInputType()), wrap(wrapper, getOutputType())).getType());
|
||||
wrapper(input, getInputType()), wrapper(output, getOutputType()))
|
||||
.getType());
|
||||
}
|
||||
|
||||
public FunctionType wrap(Class<?> wrapper) {
|
||||
return wrap(wrapper, wrapper);
|
||||
}
|
||||
|
||||
public static FunctionType compose(FunctionType input, FunctionType output) {
|
||||
@@ -173,7 +186,7 @@ public class FunctionType {
|
||||
.getType());
|
||||
}
|
||||
|
||||
private ResolvableType wrap(Class<?> wrapper, Class<?> type) {
|
||||
private ResolvableType wrapper(Class<?> wrapper, Class<?> type) {
|
||||
return wrap(this, wrapper, type);
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,9 @@
|
||||
package org.springframework.cloud.function.context.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
@@ -78,6 +80,22 @@ public class BeanFactoryFunctionCatalogTests {
|
||||
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("i=2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void registerFunctionWithMonoType() {
|
||||
processor.register(
|
||||
new FunctionRegistration<Function<Flux<String>, Mono<Map<String, Integer>>>>(
|
||||
flux -> flux.collect(HashMap::new,
|
||||
(map, word) -> map.merge(word, 1, Integer::sum)))
|
||||
.names("foos")
|
||||
.type(FunctionType.from(String.class)
|
||||
.to(Map.class)
|
||||
.wrap(Flux.class, Mono.class).getType()));
|
||||
Function<Flux<String>, Mono<Map<String, Integer>>> foos = processor
|
||||
.lookup(Function.class, "");
|
||||
assertThat(foos.apply(Flux.just("one", "one", "two")).block())
|
||||
.containsEntry("one", 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lookupNonExistentConsumerWithEmptyName() {
|
||||
processor.register(new FunctionRegistration<>(new Foos()).names("foos"));
|
||||
|
||||
@@ -19,6 +19,7 @@ package org.springframework.cloud.function.context.config;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
@@ -236,6 +237,22 @@ public class ContextFunctionCatalogAutoConfigurationTests {
|
||||
.isAssignableFrom(Publisher.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void monoFunction() {
|
||||
create(MonoConfiguration.class);
|
||||
assertThat(context.getBean("function")).isInstanceOf(Function.class);
|
||||
assertThat(catalog.<Function<?, ?>>lookup(Function.class, "function"))
|
||||
.isInstanceOf(Function.class);
|
||||
assertThat(inspector.isMessage(catalog.lookup(Function.class, "function")))
|
||||
.isFalse();
|
||||
assertThat(inspector.getInputType(catalog.lookup(Function.class, "function")))
|
||||
.isAssignableFrom(String.class);
|
||||
assertThat(inspector.getInputWrapper(catalog.lookup(Function.class, "function")))
|
||||
.isAssignableFrom(Flux.class);
|
||||
assertThat(inspector.getOutputWrapper(catalog.lookup(Function.class, "function")))
|
||||
.isAssignableFrom(Mono.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void messageFunction() {
|
||||
create(MessageConfiguration.class);
|
||||
@@ -756,6 +773,16 @@ public class ContextFunctionCatalogAutoConfigurationTests {
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class MonoConfiguration {
|
||||
@Bean
|
||||
public Function<Flux<String>, Mono<Map<String, Integer>>> function() {
|
||||
return flux -> flux.collect(HashMap::new,
|
||||
(map, word) -> map.merge(word, 1, Integer::sum));
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class MessageConfiguration {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.function.web.flux;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
@@ -124,12 +125,24 @@ public class FunctionController {
|
||||
return Mono.from(result);
|
||||
}
|
||||
|
||||
if (isInputMultiple(handler) && isOutputSingle(handler)) {
|
||||
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, true,
|
||||
WebRequest.SCOPE_REQUEST);
|
||||
return Mono.from(result);
|
||||
}
|
||||
|
||||
request.setAttribute(WebRequestConstants.OUTPUT_SINGLE, false,
|
||||
WebRequest.SCOPE_REQUEST);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private boolean isInputMultiple(Object handler) {
|
||||
Class<?> type = inspector.getInputType(handler);
|
||||
Class<?> wrapper = inspector.getInputWrapper(handler);
|
||||
return Collection.class.isAssignableFrom(type) || Flux.class.equals(wrapper);
|
||||
}
|
||||
|
||||
private boolean isOutputSingle(Object handler) {
|
||||
Class<?> type = inspector.getOutputType(handler);
|
||||
Class<?> wrapper = inspector.getOutputWrapper(handler);
|
||||
|
||||
@@ -15,10 +15,24 @@
|
||||
*/
|
||||
package org.springframework.cloud.function.web;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
@@ -38,18 +52,12 @@ import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.StringUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
*/
|
||||
@@ -219,7 +227,7 @@ public class RestApplicationTests {
|
||||
assertThat(rest.exchange(
|
||||
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
|
||||
String.class).getBody())
|
||||
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
|
||||
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -418,7 +426,7 @@ public class RestApplicationTests {
|
||||
// The new line in the middle is optional
|
||||
.body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"),
|
||||
String.class).getBody())
|
||||
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
|
||||
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -426,7 +434,7 @@ public class RestApplicationTests {
|
||||
assertThat(rest.exchange(RequestEntity.post(new URI("/uppercase"))
|
||||
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
|
||||
.body("[\"foo\",\"bar\"]"), String.class).getBody())
|
||||
.isEqualTo(sse("(FOO)", "(BAR)"));
|
||||
.isEqualTo(sse("(FOO)", "(BAR)"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -437,10 +445,19 @@ public class RestApplicationTests {
|
||||
map.put("A", Arrays.asList("1", "2", "3"));
|
||||
map.put("B", Arrays.asList("5", "6"));
|
||||
|
||||
assertThat(rest.exchange(RequestEntity.post(new URI("/sum"))
|
||||
.accept(MediaType.APPLICATION_JSON).contentType(MediaType.MULTIPART_FORM_DATA)
|
||||
.body(map), String.class).getBody())
|
||||
.isEqualTo("[{\"A\":6,\"B\":11}]");
|
||||
assertThat(rest.exchange(
|
||||
RequestEntity.post(new URI("/sum")).accept(MediaType.APPLICATION_JSON)
|
||||
.contentType(MediaType.MULTIPART_FORM_DATA).body(map),
|
||||
String.class).getBody()).isEqualTo("[{\"A\":6,\"B\":11}]");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void count() throws Exception {
|
||||
List<String> list = Arrays.asList("A", "B", "A");
|
||||
assertThat(rest.exchange(
|
||||
RequestEntity.post(new URI("/count")).accept(MediaType.APPLICATION_JSON)
|
||||
.contentType(MediaType.APPLICATION_JSON).body(list),
|
||||
String.class).getBody()).isEqualTo("{\"A\":2,\"B\":1}");
|
||||
}
|
||||
|
||||
private String sse(String... values) {
|
||||
@@ -594,18 +611,16 @@ public class RestApplicationTests {
|
||||
|
||||
@Bean
|
||||
public Function<MultiValueMap<String, String>, Map<String, Integer>> sum() {
|
||||
return valueMap -> valueMap
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(
|
||||
Collectors
|
||||
.toMap(
|
||||
Map.Entry::getKey,
|
||||
values -> values.getValue().stream().mapToInt(Integer::parseInt).sum()
|
||||
)
|
||||
);
|
||||
return valueMap -> valueMap.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, values -> values
|
||||
.getValue().stream().mapToInt(Integer::parseInt).sum()));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Flux<String>, Mono<Map<String, Integer>>> count() {
|
||||
return flux -> flux.collect(HashMap::new,
|
||||
(map, word) -> map.merge(word, 1, Integer::sum));
|
||||
}
|
||||
}
|
||||
|
||||
public static class Foo {
|
||||
|
||||
Reference in New Issue
Block a user