Remove function composition from the catalog interface
Should be easy enoug hto add back later, but it was causing issues with type conversion where we are npot yet sophisticated enough to chain functions together and keep track of the types being passed between them.
This commit is contained in:
@@ -47,15 +47,6 @@ public class ApplicationContextFunctionCatalog implements FunctionCatalog {
|
||||
return (Function<T, R>) functions.get(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, R> Function<T, R> composeFunction(String... functionNames) {
|
||||
Function<T, R> function = this.lookupFunction(functionNames[0]);
|
||||
for (int i = 1; i < functionNames.length; i++) {
|
||||
function = function.andThen(this.lookupFunction(functionNames[i]));
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> Supplier<T> lookupSupplier(String name) {
|
||||
|
||||
@@ -51,17 +51,6 @@ public abstract class AbstractFunctionRegistry implements FunctionRegistry {
|
||||
|
||||
private final SimpleClassLoader classLoader = new SimpleClassLoader(AbstractFunctionRegistry.class.getClassLoader());
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T, R> Function<T, R> composeFunction(String... functionNames) {
|
||||
@SuppressWarnings("rawtypes")
|
||||
Function function = this.lookupFunction(functionNames[0]);
|
||||
for (int i = 1; i < functionNames.length; i++) {
|
||||
function = function.andThen(this.lookupFunction(functionNames[i]));
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Consumer<T> lookupConsumer(String name) {
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -29,7 +29,5 @@ public interface FunctionCatalog {
|
||||
|
||||
<T, R> Function<T, R> lookupFunction(String name);
|
||||
|
||||
<T, R> Function<T, R> composeFunction(String... functionNames);
|
||||
|
||||
<T> Supplier<T> lookupSupplier(String name);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.function.registry;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@@ -26,6 +24,8 @@ import java.util.function.Function;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
/**
|
||||
@@ -53,15 +53,4 @@ public class FileSystemFunctionRegistryTests {
|
||||
assertEquals("BAR", results.get(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void composeFunction() throws IOException {
|
||||
FileSystemFunctionRegistry registry = new FileSystemFunctionRegistry(this.directory);
|
||||
registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())");
|
||||
registry.registerFunction("exclaim", "f->f.map(s->s+\"!!!\")");
|
||||
Function<Flux<String>, Flux<String>> function = registry.composeFunction("uppercase", "exclaim");
|
||||
Flux<String> output = function.apply(Flux.just("foo", "bar"));
|
||||
List<String> results = output.collectList().block();
|
||||
assertEquals("FOO!!!", results.get(0));
|
||||
assertEquals("BAR!!!", results.get(1));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,15 +52,6 @@ public class FunctionExtractingFunctionCatalog implements FunctionCatalog {
|
||||
return (Function<T, R>) find(name, "lookupFunction");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, R> Function<T, R> composeFunction(String... functionNames) {
|
||||
Function<T, R> function = this.lookupFunction(functionNames[0]);
|
||||
for (int i = 1; i < functionNames.length; i++) {
|
||||
function = function.andThen(this.lookupFunction(functionNames[i]));
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> Supplier<T> lookupSupplier(String name) {
|
||||
|
||||
@@ -17,6 +17,7 @@ package org.springframework.cloud.function.deployer;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
@@ -29,6 +30,8 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
* @author Dave Syer
|
||||
*
|
||||
*/
|
||||
@Ignore
|
||||
// TODO: Salvage some stuff from this project
|
||||
public class FunctionExtractingFunctionCatalogIntegrationTests {
|
||||
|
||||
private static ConfigurableApplicationContext context;
|
||||
|
||||
@@ -48,10 +48,7 @@ public class StreamConfiguration {
|
||||
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
|
||||
public AbstractFunctionInvoker<?, ?> invoker(FunctionCatalog registry) {
|
||||
String name = properties.getName();
|
||||
Function<Flux<Object>, Flux<Object>> function = (name.indexOf(',') == -1)
|
||||
? registry.lookupFunction(name)
|
||||
: registry.composeFunction(
|
||||
StringUtils.commaDelimitedListToStringArray(name));
|
||||
Function<Flux<Object>, Flux<Object>> function = registry.lookupFunction(name);
|
||||
return new StreamListeningFunctionInvoker(function);
|
||||
}
|
||||
|
||||
|
||||
@@ -51,10 +51,8 @@ public class TaskConfiguration {
|
||||
final Supplier<Flux<Object>> supplier = registry
|
||||
.lookupSupplier(properties.getSupplier());
|
||||
String functionName = properties.getFunction();
|
||||
Function<Flux<Object>, Flux<Object>> function = (functionName.indexOf(',') == -1)
|
||||
? registry.lookupFunction(functionName)
|
||||
: registry.composeFunction(
|
||||
StringUtils.commaDelimitedListToStringArray(functionName));
|
||||
Function<Flux<Object>, Flux<Object>> function = registry
|
||||
.lookupFunction(functionName);
|
||||
final Consumer<Object> consumer = registry
|
||||
.lookupConsumer(properties.getConsumer());
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@@ -51,13 +51,7 @@ public class FunctionController {
|
||||
@PostMapping(path = "/{name}")
|
||||
public Flux<String> function(@PathVariable String name,
|
||||
@RequestBody Flux<String> body) {
|
||||
Function<Object, Object> function;
|
||||
if (name.contains(",")) {
|
||||
function = functions.composeFunction(name.split(","));
|
||||
}
|
||||
else {
|
||||
function = functions.lookupFunction(name);
|
||||
}
|
||||
Function<Object, Object> function = functions.lookupFunction(name);
|
||||
@SuppressWarnings("unchecked")
|
||||
Flux<String> result = (Flux<String>) function.apply(body);
|
||||
return debug ? result.log() : result;
|
||||
|
||||
Reference in New Issue
Block a user