diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index 713a8c40e..aa3e77ba3 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -215,6 +215,10 @@ public class ContextFunctionCatalogAutoConfiguration { .getGenericReturnType()); type = (ParameterizedType) type.getActualTypeArguments()[0]; Type param = type.getActualTypeArguments()[0]; + if (param instanceof ParameterizedType) { + ParameterizedType concrete = (ParameterizedType) param; + param = concrete.getRawType(); + } return ClassUtils.resolveClassName(param.getTypeName(), registry.getClass().getClassLoader()); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java index ef9279a66..63011f8a3 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestApplication.java @@ -16,29 +16,14 @@ package org.springframework.cloud.function.web; -import java.nio.CharBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.function.IntPredicate; - -import org.reactivestreams.Publisher; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.core.ResolvableType; -import org.springframework.core.codec.StringDecoder; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.http.MediaType; -import org.springframework.http.codec.DecoderHttpMessageReader; import org.springframework.http.codec.HttpMessageReader; -import org.springframework.util.MimeType; +import org.springframework.http.codec.ServerSentEventHttpMessageReader; import org.springframework.web.reactive.config.WebReactiveConfigurer; -import reactor.core.publisher.Flux; - /** * @author Mark Fisher */ @@ -47,66 +32,10 @@ public class RestApplication implements WebReactiveConfigurer { @Override public void extendMessageReaders(List> readers) { - readers.add(0, new DecoderHttpMessageReader<>(new SseDecoder())); + readers.add(0, new ServerSentEventHttpMessageReader()); } public static void main(String[] args) { SpringApplication.run(RestApplication.class, args); } -} - -class SseDecoder extends StringDecoder { - - private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; - - @Override - public boolean canDecode(ResolvableType elementType, MimeType mimeType) { - return super.canDecode(elementType, mimeType) - && MediaType.TEXT_EVENT_STREAM.isCompatibleWith(mimeType); - } - - @Override - public Flux decode(Publisher inputStream, - ResolvableType elementType, MimeType mimeType, Map hints) { - - Flux inputFlux = Flux.from(inputStream); - inputFlux = Flux.from(inputStream).flatMap(SseDecoder::splitOnNewline); - return inputFlux.map(buffer -> decodeDataBuffer(buffer, mimeType)); - } - - private static Flux splitOnNewline(DataBuffer dataBuffer) { - List results = new ArrayList<>(); - int startIdx = 0; - int endIdx; - final int limit = dataBuffer.readableByteCount(); - do { - endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx); - endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, endIdx + 1); - int length = (endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx) - 7; - if (length > 0) { - DataBuffer token = dataBuffer.slice(startIdx + 5, length); - results.add(DataBufferUtils.retain(token)); - } - startIdx = endIdx + 1; - } - while (startIdx < limit && endIdx != -1); - DataBufferUtils.release(dataBuffer); - return Flux.fromIterable(results); - } - - private String decodeDataBuffer(DataBuffer dataBuffer, MimeType mimeType) { - Charset charset = getCharset(mimeType); - CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer()); - DataBufferUtils.release(dataBuffer); - return charBuffer.toString(); - } - - private Charset getCharset(MimeType mimeType) { - if (mimeType != null && mimeType.getCharset() != null) { - return mimeType.getCharset(); - } - else { - return DEFAULT_CHARSET; - } - } } \ No newline at end of file diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index d0c83db43..fde19e269 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -16,9 +16,12 @@ package org.springframework.cloud.function.web; import java.net.URI; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -58,6 +61,16 @@ public class RestApplicationTests { String.class).getBody()).isEqualTo(sse("foo", "bar")); } + @Test + public void wordsJson() throws Exception { + assertThat(rest + .exchange( + RequestEntity.get(new URI("http://localhost:" + port + "/words")) + .accept(MediaType.APPLICATION_JSON).build(), + String.class) + .getBody()).isEqualTo("[\"foo\",\"bar\"]"); + } + @Test public void words() throws Exception { assertThat(rest.exchange( @@ -71,6 +84,32 @@ public class RestApplicationTests { "foo\nbar", String.class)).isEqualTo("[FOO][BAR]"); } + @Test + @Ignore("Not working even though the JSON stream is") + // Not working because we convert the JSON to objects in + // FunctionExtractingFunctionCatalog, so all we need from Spring Reactive is a String, + // and the StringDecoder splits on newlines + public void uppercaseJsonArray() throws Exception { + assertThat(rest + .exchange( + RequestEntity.post(new URI("http://localhost:" + port + "/maps")) + .contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), + String.class) + .getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}"); + } + + @Test + public void uppercaseJsonStream() throws Exception { + assertThat(rest + .exchange( + RequestEntity.post(new URI("http://localhost:" + port + "/maps")) + .contentType(MediaType.APPLICATION_JSON) + .body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), + String.class) + .getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}"); + } + @Test public void uppercaseSSE() throws Exception { assertThat(rest.exchange( @@ -92,6 +131,14 @@ public class RestApplicationTests { return flux -> flux.map(value -> "[" + value.trim().toUpperCase() + "]"); } + @Bean + public Function>, Flux>> maps() { + return flux -> flux.map(value -> { + value.put("value", value.get("value").trim().toUpperCase()); + return value; + }); + } + @Bean public Supplier> words() { return () -> Flux.fromArray(new String[] { "foo", "bar" });