diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/FunctionGateway.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/FunctionGateway.java deleted file mode 100644 index b94362dcf..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/FunctionGateway.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.gateway; - -import java.util.function.Consumer; -import java.util.function.Supplier; - -import org.reactivestreams.Publisher; -import org.springframework.scheduling.Trigger; - -/** - * @author Mark Fisher - */ -public interface FunctionGateway { - - R invoke(String functionName, T request); - - void schedule(String functionName, Trigger trigger, Supplier supplier, Consumer consumer); - - void subscribe(Publisher publisher, String functionName, final Consumer consumer); - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java deleted file mode 100644 index 9398debe8..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.gateway; - -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.cloud.function.invoker.FunctionInvokingRunnable; -import org.springframework.cloud.function.registry.FunctionCatalog; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.Trigger; -import org.springframework.util.Assert; - -/** - * @author Mark Fisher - */ -public class LocalFunctionGateway implements FunctionGateway { - - private final FunctionCatalog catalog; - - private final TaskScheduler scheduler; - - public LocalFunctionGateway(FunctionCatalog catalog, TaskScheduler scheduler) { - Assert.notNull(catalog, "FunctionCatalog must not be null"); - Assert.notNull(scheduler, "TaskScheduler must not be null"); - this.catalog = catalog; - this.scheduler = scheduler; - } - - @Override - public R invoke(String functionName, T request) { - Function function = this.catalog.lookupFunction(functionName); - return function.apply(request); - } - - @Override - public void schedule(String functionName, Trigger trigger, - Supplier supplier, Consumer consumer) { - Function function = this.catalog.lookupFunction(functionName); - this.scheduler.schedule( - new FunctionInvokingRunnable(supplier, function, consumer), - trigger); - } - - @Override - public void subscribe(Publisher publisher, String functionName, - final Consumer consumer) { - final Function function = this.catalog.lookupFunction(functionName); - publisher.subscribe(new Subscriber() { - - @Override - public void onComplete() { - } - - @Override - public void onError(Throwable error) { - } - - @Override - public void onNext(T next) { - if (consumer != null) { - consumer.accept(function.apply(next)); - } - else { - function.apply(next); - } - } - - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(Long.MAX_VALUE); - } - }); - } -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/invoker/AbstractFunctionInvoker.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/invoker/AbstractFunctionInvoker.java deleted file mode 100644 index f82ab6103..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/invoker/AbstractFunctionInvoker.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.invoker; - -import java.util.function.Function; - -import org.springframework.util.Assert; - -/** - * @author Mark Fisher - * - * @param function parameter type - * @param function return type - */ -public abstract class AbstractFunctionInvoker { - - private final Function function; - - protected AbstractFunctionInvoker(Function function) { - Assert.notNull(function, "Function must not be null"); - this.function = function; - } - - protected R doInvoke(T input) { - return this.function.apply(input); - } -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/invoker/FunctionInvokingRunnable.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/invoker/FunctionInvokingRunnable.java deleted file mode 100644 index b7ee58dc4..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/invoker/FunctionInvokingRunnable.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.invoker; - -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -/** - * @author Mark Fisher - * - * @param output of supplier, input to function - * @param output of function, input to consumer - */ -public class FunctionInvokingRunnable implements Runnable { - - private final Supplier supplier; - - private final Function function; - - private final Consumer consumer; - - public FunctionInvokingRunnable(Supplier supplier, Function function, Consumer consumer) { - this.supplier = supplier; - this.function = function; - this.consumer = consumer; - } - - @Override - public void run() { - this.consumer.accept(this.function.apply(this.supplier.get())); - } -} diff --git a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/gateway/LocalFunctionGatewayTests.java b/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/gateway/LocalFunctionGatewayTests.java deleted file mode 100644 index 288b9c0ec..000000000 --- a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/gateway/LocalFunctionGatewayTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.gateway; - -import static org.junit.Assert.assertEquals; - -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.junit.Before; -import org.junit.Test; - -import org.springframework.cloud.function.registry.FunctionCatalog; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; - -import reactor.core.publisher.Flux; - -/** - * @author Mark Fisher - */ -public class LocalFunctionGatewayTests { - - private final FunctionCatalog catalog = new FunctionCatalog() { - - @Override - public Supplier lookupSupplier(String name) { - return null; - } - - @Override - @SuppressWarnings("unchecked") - public Function, Flux> lookupFunction(String name) { - return ("uppercase".equals(name) ? f->f.map(s->s.toString().toUpperCase()) : null); - } - - @Override - public Consumer lookupConsumer(String name) { - return null; - } - }; - - private final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - - @Before - public void init() { - this.scheduler.initialize(); - } - - @Test - public void test() { - LocalFunctionGateway gateway = new LocalFunctionGateway(catalog, scheduler); - Flux output = gateway.invoke("uppercase", Flux.just("foo", "bar")); - List results = output.collectList().block(); - assertEquals("FOO", results.get(0)); - assertEquals("BAR", results.get(1)); - } - - @Test - public void testMultiple() { - for (int i = 0; i < 100; i++) { - test(); - } - } -}