Migrate to servlet binder for web features

This commit is contained in:
Dave Syer
2017-08-08 08:27:04 +01:00
parent 540b4d378e
commit 1af0d451cf
107 changed files with 4055 additions and 2010 deletions

View File

@@ -1,61 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream;
import java.util.function.Supplier;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* @author Mark Fisher
*/
public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
private final FunctionCatalog functionCatalog;
public SupplierInvokingMessageProducer(FunctionCatalog registry) {
this.functionCatalog = registry;
this.setOutputChannelName(Source.OUTPUT);
}
@Override
protected void doStart() {
supplier().subscribeOn(Schedulers.elastic()).subscribe(m -> this.sendMessage(m));
}
private Flux<Message<?>> supplier() {
Supplier<Flux<?>> supplier = null;
Flux<Message<?>> result = Flux.empty();
for (String name : functionCatalog.getSupplierNames()) {
supplier = functionCatalog.lookupSupplier(name);
Assert.notNull(supplier, "Supplier must not be null");
result = Flux.merge(result,
supplier.get().map(payload -> MessageBuilder.withPayload(payload)
.setHeader(StreamConfigurationProperties.ROUTE_KEY, name)
.build()));
}
return result;
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.config;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.binder.servlet.RouteRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Dave Syer
*
*/
@Configuration
@ConditionalOnBean(FunctionCatalog.class)
@ConditionalOnClass(RouteRegistry.class)
@AutoConfigureAfter(ContextFunctionCatalogAutoConfiguration.class)
public class RouteRegistryAutoConfiguration {
@Bean
public RouteRegistry supplierRoutes(FunctionCatalog registry) {
return () -> registry.getSupplierNames();
}
}

View File

@@ -14,14 +14,14 @@
* limitations under the License.
*/
package org.springframework.cloud.function.stream;
package org.springframework.cloud.function.stream.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.Binder;
@@ -39,7 +39,7 @@ import org.springframework.context.annotation.Lazy;
@ConditionalOnBean(FunctionCatalog.class)
@ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true)
@EnableBinding(Processor.class)
public class StreamConfiguration {
public class StreamAutoConfiguration {
@Autowired
private StreamConfigurationProperties properties;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.stream;
package org.springframework.cloud.function.stream.config;
import org.springframework.boot.context.properties.ConfigurationProperties;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.stream;
package org.springframework.cloud.function.stream.config;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,7 +26,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
@@ -101,7 +101,11 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
}
private Message<?> message(Object result, Map<String, Object> headers) {
return result instanceof Message ? (Message<?>) result
return result instanceof Message
// TODO: why do we have to do this? The headers should have come with the
// result.
? MessageBuilder.fromMessage((Message<?>) result)
.copyHeadersIfAbsent(headers).build()
: MessageBuilder.withPayload(result).copyHeadersIfAbsent(headers).build();
}

View File

@@ -0,0 +1,110 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.config;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.springframework.cloud.function.core.FunctionCatalog;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
/**
* @author Mark Fisher
*/
public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
private final FunctionCatalog functionCatalog;
private final Set<String> suppliers = new HashSet<>();
private final Map<String, Disposable> disposables = new HashMap<>();
public SupplierInvokingMessageProducer(FunctionCatalog registry) {
this.functionCatalog = registry;
this.setOutputChannelName(Source.OUTPUT);
}
@Override
protected void doStart() {
for (String name : functionCatalog.getSupplierNames()) {
start(name);
}
}
@Override
protected void doStop() {
for (String name : new HashSet<>(suppliers)) {
stop(name);
}
}
public void stop(String name) {
if (disposables.containsKey(name)) {
synchronized (disposables) {
if (disposables.containsKey(name)) {
try {
disposables.get(name).dispose();
}
finally {
disposables.remove(name);
suppliers.remove(name);
}
}
}
}
}
public void start(String name) {
if (!disposables.containsKey(name)) {
synchronized (disposables) {
if (!disposables.containsKey(name)) {
Supplier<Flux<?>> supplier = functionCatalog.lookupSupplier(name);
if (supplier != null) {
suppliers.add(name);
disposables.put(name,
supplier.get().subscribeOn(Schedulers.elastic()).subscribe(m -> send(name, m)));
}
}
}
}
}
private void send(String name, Object payload) {
Message<?> message;
if (payload instanceof Message) {
message = MessageBuilder.fromMessage((Message<?>) payload)
.setHeaderIfAbsent(StreamConfigurationProperties.ROUTE_KEY, name)
.build();
}
else {
message = MessageBuilder.withPayload(payload)
.setHeader(StreamConfigurationProperties.ROUTE_KEY, name).build();
}
getOutputChannel().send(message);
}
}

View File

@@ -1,2 +1,3 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.stream.StreamConfiguration
org.springframework.cloud.function.stream.config.StreamAutoConfiguration,\
org.springframework.cloud.function.stream.config.RouteRegistryAutoConfiguration

View File

@@ -29,7 +29,7 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.stream.StreamConfigurationProperties;
import org.springframework.cloud.function.stream.config.StreamConfigurationProperties;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;

View File

@@ -29,7 +29,7 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.stream.StreamConfigurationProperties;
import org.springframework.cloud.function.stream.config.StreamConfigurationProperties;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;

View File

@@ -0,0 +1,88 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.scan;
import java.net.URI;
import java.util.function.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class ComponentTests {
@LocalServerPort
private int port;
@Autowired
private Greeter greeter;
@Autowired
private TestRestTemplate rest;
@Test
public void contextLoads() throws Exception {
assertThat(greeter).isNotNull();
}
@Test
public void greeter() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.post(new URI("/stream/greeter"))
.contentType(MediaType.TEXT_PLAIN).body("World"),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("Hello World");
}
@SpringBootApplication(exclude=TestSupportBinderAutoConfiguration.class)
@ComponentScan
protected static class TestConfiguration {
}
@Component("greeter")
protected static class Greeter implements Function<Flux<String>, Flux<String>> {
@Override
public Flux<String> apply(Flux<String> flux) {
return flux.map(name -> "Hello " + name);
}
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.supplier;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.stream.config.StreamConfigurationProperties;
import org.springframework.cloud.function.stream.config.SupplierInvokingMessageProducer;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RestartStreamSupplierTests.StreamingFunctionApplication.class)
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RestartStreamSupplierTests {
@Autowired
Source source;
@Autowired
MessageCollector messageCollector;
@Autowired
SupplierInvokingMessageProducer<?> producer;
@Rule
public ExpectedException expected = ExpectedException.none();
@Test
public void exhausted() throws Exception {
test();
expected.expect(NullPointerException.class);
test();
}
@Test
public void restart() throws Exception {
test();
assertThat(messageCollector.forChannel(source.output())).isEmpty();
producer.stop();
producer.start();
test();
}
private void test() throws Exception {
Message<?> result = messageCollector.forChannel(source.output()).poll(1000,
TimeUnit.MILLISECONDS);
assertThat(result.getPayload()).isEqualTo("foo");
assertThat(result.getHeaders().get(StreamConfigurationProperties.ROUTE_KEY))
.isEqualTo("simpleSupplier");
}
@SpringBootApplication
public static class StreamingFunctionApplication {
@Bean
public Supplier<String> simpleSupplier() {
return () -> "foo";
}
}
}

View File

@@ -25,7 +25,7 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.stream.StreamConfigurationProperties;
import org.springframework.cloud.function.stream.config.StreamConfigurationProperties;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;

View File

@@ -0,0 +1,79 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.web;
import java.net.URI;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.cloud.stream.binder.servlet.prefix=/functions")
public class PrefixTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/functions/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void missing() throws Exception {
ResponseEntity<String> result = rest
.exchange(RequestEntity.get(new URI("/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@EnableAutoConfiguration(exclude=TestSupportBinderAutoConfiguration.class)
@Configuration
protected static class TestConfiguration {
@Bean({ "words", "get/more" })
public Supplier<Flux<String>> words() {
return () -> Flux.fromArray(new String[] { "foo", "bar" });
}
}
}

View File

@@ -0,0 +1,610 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.web;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StringUtils;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
"logging.level.org.springframework.integration=DEBUG",
"spring.autoconfigure.exclude=org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration" })
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RestApplicationTests {
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Autowired
private ApplicationConfiguration test;
@Before
public void init() {
test.list.clear();
}
@Test
@Ignore("Needs Spring 5?")
public void wordsSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("/stream/words")).accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
public void wordsJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/stream/words"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Fix error handling")
public void errorJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/stream/bang"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/stream/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void word() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/stream/word")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void foos() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/stream/foos")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"},{\"value\":\"bar\"}]");
}
@Test
public void qualifierFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/foos")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"[FOO]\"},{\"value\":\"[BAR]\"}]");
}
@Test
public void getMore() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/stream/get/more")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void bareWords() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/stream/bareWords")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[[\"foo\",\"bar\"]]");
}
@Test
@Ignore("Should this even work? Or do we need to be explicit about the JSON?")
public void updates() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/stream/updates")).body("one\ntwo"),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("onetwo");
}
@Test
public void updatesJson() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/updates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]");
}
@Test
public void addFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/addFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"foo\"}, {\"value\":\"bar\"}]");
}
@Test
public void bareUpdates() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/bareUpdates"))
.contentType(MediaType.APPLICATION_JSON).body("[\"one\",\"two\"]"),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[\"one\",\"two\"]");
}
@Test
public void timeoutJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/stream/timeout"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/stream/empty"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[]");
}
@Test
public void sentences() throws Exception {
assertThat(rest.exchange(RequestEntity.get(new URI("/stream/sentences")).build(),
String.class).getBody())
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptAny() throws Exception {
assertThat(rest.exchange(RequestEntity.get(new URI("/stream/sentences"))
.accept(MediaType.ALL).build(), String.class).getBody())
.isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
}
@Test
public void sentencesAcceptJson() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.get(new URI("/stream/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isGreaterThanOrEqualTo(MediaType.APPLICATION_JSON);
}
@Test
@Ignore("Maybe not supported")
public void sentencesAcceptSse() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.get(new URI("/stream/sentences")).accept(EVENT_STREAM).build(),
String.class);
assertThat(result.getBody())
.isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]"));
assertThat(result.getHeaders().getContentType().isCompatibleWith(EVENT_STREAM))
.isTrue();
}
@Test
public void uppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/uppercase"))
.contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"),
String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void messages() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/messages")).contentType(MediaType.APPLICATION_JSON)
.header("x-foo", "bar").body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
assertThat(result.getHeaders().getFirst("x-foo")).isEqualTo("bar");
assertThat(result.getHeaders()).doesNotContainKey("id");
}
@Test
public void headers() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/headers")).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
assertThat(result.getHeaders().getFirst("foo")).isEqualTo("bar");
assertThat(result.getHeaders()).doesNotContainKey("id");
}
@Test
public void uppercaseSingleValue() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.post(new URI("/stream/uppercase"))
.contentType(MediaType.TEXT_PLAIN).body("foo"),
String.class);
assertThat(result.getBody()).isEqualTo("(FOO)");
}
@Test
@Ignore("WebFlux would split the request body into lines: TODO make this work the same")
public void uppercasePlainText() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.post(new URI("/stream/uppercase"))
.contentType(MediaType.TEXT_PLAIN).body("foo\nbar"),
String.class);
assertThat(result.getBody()).isEqualTo("(FOO)(BAR)");
}
@Test
public void uppercaseFoos() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/upFoos")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void uppercaseFoo() throws Exception {
// Single Foo can be parsed
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/upFoos")).contentType(MediaType.APPLICATION_JSON)
.body("{\"value\":\"foo\"}"), String.class);
assertThat(result.getBody()).isEqualTo("{\"value\":\"FOO\"}");
}
@Test
public void bareUppercaseFoos() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.post(new URI("/stream/bareUpFoos"))
.contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"),
String.class);
assertThat(result.getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
public void bareUppercaseFoo() throws Exception {
// Single Foo can be parsed and returns a single value if the function is defined
// that way
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/bareUpFoos"))
.contentType(MediaType.APPLICATION_JSON).body("{\"value\":\"foo\"}"),
String.class);
assertThat(result.getBody()).isEqualTo("{\"value\":\"FOO\"}");
}
@Test
public void bareUppercase() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/bareUppercase"))
.contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"),
String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void transform() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/transform"))
.contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"),
String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void postMore() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity
.post(new URI("/stream/post/more"))
.contentType(MediaType.APPLICATION_JSON).body("[\"foo\",\"bar\"]"),
String.class);
assertThat(result.getBody()).isEqualTo("[\"(FOO)\",\"(BAR)\"]");
}
@Test
public void postMoreFoo() {
assertThat(rest.getForObject("/stream/post/more/foo", String.class))
.isEqualTo("(FOO)");
}
@Test
public void uppercaseGet() {
assertThat(rest.getForObject("/stream/uppercase/foo", String.class))
.isEqualTo("(FOO)");
}
@Test
public void convertGet() {
assertThat(rest.getForObject("/stream/wrap/123", String.class))
.isEqualTo("..123..");
}
@Test
public void convertPost() throws Exception {
ResponseEntity<String> result = rest.exchange(RequestEntity.post(new URI("/stream/wrap"))
.contentType(MediaType.TEXT_PLAIN).body("123"), String.class);
assertThat(result.getBody()).isEqualTo("..123..");
}
@Test
public void convertPostJson() throws Exception {
ResponseEntity<String> result = rest
.exchange(
RequestEntity.post(new URI("/stream/doubler"))
.contentType(MediaType.TEXT_PLAIN).body("123"),
String.class);
assertThat(result.getBody()).isEqualTo("246");
}
@Test
public void supplierFirst() {
assertThat(rest.getForObject("/stream/not/a/function", String.class))
.isEqualTo("[\"hello\"]");
}
@Test
public void convertGetJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/stream/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
}
@Test
public void uppercaseJsonArray() throws Exception {
assertThat(rest.exchange(
RequestEntity.post(new URI("/stream/maps"))
.contentType(MediaType.APPLICATION_JSON)
// The new line in the middle is optional
.body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"),
String.class).getBody())
.isEqualTo("[{\"value\":\"FOO\"},{\"value\":\"BAR\"}]");
}
@Test
@Ignore("Doesn't make sense: if you post in an array you expect to get back an array")
public void uppercaseSSE() throws Exception {
assertThat(rest.exchange(RequestEntity.post(new URI("/stream/uppercase"))
.accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON)
.body("[\"foo\",\"bar\"]"), String.class).getBody())
.isEqualTo(sse("(FOO)", "(BAR)"));
}
private String sse(String... values) {
return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n";
}
@TestConfiguration
public static class ApplicationConfiguration {
private List<String> list = new ArrayList<>();
@Bean({ "uppercase", "transform", "post/more" })
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.log()
.map(value -> "(" + value.trim().toUpperCase() + ")");
}
@Bean
public Function<String, String> bareUppercase() {
return value -> "(" + value.trim().toUpperCase() + ")";
}
@Bean
public Function<Message<String>, Message<String>> messages() {
return value -> MessageBuilder
.withPayload("(" + value.getPayload().trim().toUpperCase() + ")")
.copyHeaders(value.getHeaders()).build();
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> headers() {
return flux -> flux.map(value -> MessageBuilder
.withPayload("(" + value.getPayload().trim().toUpperCase() + ")")
.setHeader("foo", "bar").build());
}
@Bean
public Function<Flux<Foo>, Flux<Foo>> upFoos() {
return flux -> flux.log()
.map(value -> new Foo(value.getValue().trim().toUpperCase()));
}
@Bean
public Function<Foo, Foo> bareUpFoos() {
return value -> new Foo(value.getValue().trim().toUpperCase());
}
@Bean
public Function<Flux<Integer>, Flux<String>> wrap() {
return flux -> flux.log().map(value -> ".." + value + "..");
}
@Bean
public Function<Flux<Integer>, Flux<Integer>> doubler() {
return flux -> flux.log().map(value -> 2 * value);
}
@Bean
public Function<Flux<Integer>, Flux<Map<String, Object>>> entity() {
return flux -> flux.log()
.map(value -> Collections.singletonMap("value", value));
}
@Bean
public Function<Flux<HashMap<String, String>>, Flux<Map<String, String>>> maps() {
return flux -> flux.map(value -> {
value.put("value", value.get("value").trim().toUpperCase());
return value;
});
}
@Bean({ "words", "get/more" })
public Supplier<Flux<String>> words() {
return () -> Flux.just("foo", "bar");
}
@Bean
public Supplier<String> word() {
return () -> "foo";
}
@Bean
public Supplier<Flux<Foo>> foos() {
return () -> Flux.just(new Foo("foo"), new Foo("bar"));
}
@Bean
@Qualifier("foos")
public Function<String, Foo> qualifier() {
return value -> new Foo("[" + value.trim().toUpperCase() + "]");
}
@Bean
public Supplier<List<String>> bareWords() {
return () -> Arrays.asList("foo", "bar");
}
@Bean
public Consumer<Flux<String>> updates() {
return flux -> flux.subscribe(value -> list.add(value));
}
@Bean
public Consumer<Flux<Foo>> addFoos() {
return flux -> flux.subscribe(value -> list.add(value.getValue()));
}
@Bean
public Consumer<String> bareUpdates() {
return value -> list.add(value);
}
@Bean
public Supplier<Flux<String>> bang() {
return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
// throw new RuntimeException("Bar");
}
return value;
});
}
@Bean
public Supplier<Flux<String>> empty() {
return () -> Flux.empty();
}
@Bean("not/a/function")
public Supplier<Flux<String>> supplier() {
return () -> Flux.just("hello");
}
@Bean("not/a")
public Function<Flux<String>, Flux<String>> function() {
return input -> Flux.just("bye");
}
@Bean
public Supplier<Flux<String>> timeout() {
return () -> Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}
@Bean
public Supplier<Flux<List<String>>> sentences() {
return () -> Flux.just(Arrays.asList("go", "home"),
Arrays.asList("come", "back"));
}
}
public static class Foo {
private String value;
public Foo(String value) {
this.value = value;
}
Foo() {
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
}

View File

@@ -0,0 +1,99 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream.web;
import java.net.URI;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.stream.web.SingletonTests.TestApplicationConfiguration;
import org.springframework.core.PriorityOrdered;
import org.springframework.http.HttpStatus;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = { "debug=true",
"spring.autoconfigure.exclude=org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration"})
@ContextConfiguration(classes = TestApplicationConfiguration.class)
public class SingletonTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate rest;
@Test
public void words() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("/stream/words")).build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@TestConfiguration
protected static class TestApplicationConfiguration
implements PriorityOrdered, BeanDefinitionRegistryPostProcessor {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
throws BeansException {
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry)
throws BeansException {
// Simulates what happens when you add a compiled function
RootBeanDefinition beanDefinition = new RootBeanDefinition(MySupplier.class);
registry.registerBeanDefinition("words", beanDefinition);
}
@Override
public int getOrder() {
return 0;
}
}
static class MySupplier implements Supplier<Flux<String>> {
@Override
public Flux<String> get() {
return Flux.just("foo", "bar");
}
}
}