From cd9c3113266d0dda664f560eb26026076d3ad4c4 Mon Sep 17 00:00:00 2001 From: markfisher Date: Thu, 22 Sep 2016 08:31:42 -0400 Subject: [PATCH] refactoring * extracted FunctionGateway interface * renamed current implementation to LocalFunctionGateway * moved `compose` methods from FunctionGateway to FunctionRegistry --- .../function/gateway/FunctionGateway.java | 79 +---------------- .../gateway/LocalFunctionGateway.java | 88 +++++++++++++++++++ .../function/registry/FunctionRegistry.java | 4 + .../registry/FunctionRegistrySupport.java | 23 +++++ 4 files changed, 119 insertions(+), 75 deletions(-) create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/FunctionGateway.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/FunctionGateway.java index 43a1bb221..b94362dcf 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/FunctionGateway.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/FunctionGateway.java @@ -17,91 +17,20 @@ package org.springframework.cloud.function.gateway; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.cloud.function.invoker.FunctionInvokingRunnable; -import org.springframework.cloud.function.registry.FunctionRegistry; -import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; -import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class FunctionGateway { +public interface FunctionGateway { - private final FunctionRegistry registry; + R invoke(String functionName, T request); - private final TaskScheduler scheduler; + void schedule(String functionName, Trigger trigger, Supplier supplier, Consumer consumer); - public FunctionGateway(FunctionRegistry registry, TaskScheduler scheduler) { - Assert.notNull(registry, "FunctionRegistry must not be null"); - Assert.notNull(scheduler, "TaskScheduler must not be null"); - this.registry = registry; - this.scheduler = scheduler; - } + void subscribe(Publisher publisher, String functionName, final Consumer consumer); - @SuppressWarnings("unchecked") - public void compose(String name, Function... functions) { - Assert.isTrue(functions != null && functions.length > 1, "more than one Function is required"); - @SuppressWarnings("rawtypes") - Function function = functions[0]; - for (int i = 1; i < functions.length; i++) { - function = function.andThen(functions[i]); - } - this.registry.register(name, function); - } - - @SuppressWarnings("unchecked") - public void compose(String composedFunctionName, String... functionNames) { - Assert.isTrue(functionNames != null && functionNames.length > 1, "more than one Function is required"); - @SuppressWarnings("rawtypes") - Function function = this.registry.lookup(functionNames[0]); - for (int i = 1; i < functionNames.length; i++) { - function = function.andThen(this.registry.lookup(functionNames[i])); - } - this.registry.register(composedFunctionName, function); - } - - public R invoke(String functionName, T request) { - Function function = this.registry.lookup(functionName); - return function.apply(request); - } - - public void schedule(String functionName, Trigger trigger, Supplier supplier, Consumer consumer) { - Function function = this.registry.lookup(functionName); - this.scheduler.schedule(new FunctionInvokingRunnable(supplier, function, consumer), trigger); - } - - public void subscribe(Publisher publisher, String functionName, final Consumer consumer) { - final Function function = this.registry.lookup(functionName); - publisher.subscribe(new Subscriber() { - - @Override - public void onComplete() {} - - @Override - public void onError(Throwable error) {} - - @Override - public void onNext(T next) { - if (consumer != null) { - consumer.accept(function.apply(next)); - } - else { - function.apply(next); - } - } - - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(Long.MAX_VALUE); - } - }); - } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java new file mode 100644 index 000000000..71cfee901 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java @@ -0,0 +1,88 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.gateway; + +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.cloud.function.invoker.FunctionInvokingRunnable; +import org.springframework.cloud.function.registry.FunctionRegistry; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.Trigger; +import org.springframework.util.Assert; + +/** + * @author Mark Fisher + */ +public class LocalFunctionGateway implements FunctionGateway { + + private final FunctionRegistry registry; + + private final TaskScheduler scheduler; + + public LocalFunctionGateway(FunctionRegistry registry, TaskScheduler scheduler) { + Assert.notNull(registry, "FunctionRegistry must not be null"); + Assert.notNull(scheduler, "TaskScheduler must not be null"); + this.registry = registry; + this.scheduler = scheduler; + } + + @Override + public R invoke(String functionName, T request) { + Function function = this.registry.lookup(functionName); + return function.apply(request); + } + + @Override + public void schedule(String functionName, Trigger trigger, Supplier supplier, Consumer consumer) { + Function function = this.registry.lookup(functionName); + this.scheduler.schedule(new FunctionInvokingRunnable(supplier, function, consumer), trigger); + } + + @Override + public void subscribe(Publisher publisher, String functionName, final Consumer consumer) { + final Function function = this.registry.lookup(functionName); + publisher.subscribe(new Subscriber() { + + @Override + public void onComplete() {} + + @Override + public void onError(Throwable error) {} + + @Override + public void onNext(T next) { + if (consumer != null) { + consumer.accept(function.apply(next)); + } + else { + function.apply(next); + } + } + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + }); + } +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java index f9220bcf6..37f703215 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java @@ -29,4 +29,8 @@ public interface FunctionRegistry { void register(String name, String function); + void compose(String composedFunctionName, Function... functions); + + void compose(String composedFunctionName, String... functionNames); + } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistrySupport.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistrySupport.java index e8cf45904..1a3498949 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistrySupport.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistrySupport.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.registry; import java.util.function.Function; import org.springframework.cloud.function.compiler.FunctionCompiler; +import org.springframework.util.Assert; /** * @author Mark Fisher @@ -32,4 +33,26 @@ public abstract class FunctionRegistrySupport implements FunctionRegistry { Function function = compiler.compile(code); this.register(name, function); } + + @SuppressWarnings("unchecked") + public void compose(String name, Function... functions) { + Assert.isTrue(functions != null && functions.length > 1, "more than one Function is required"); + @SuppressWarnings("rawtypes") + Function function = functions[0]; + for (int i = 1; i < functions.length; i++) { + function = function.andThen(functions[i]); + } + this.register(name, function); + } + + @SuppressWarnings("unchecked") + public void compose(String composedFunctionName, String... functionNames) { + Assert.isTrue(functionNames != null && functionNames.length > 1, "more than one Function is required"); + @SuppressWarnings("rawtypes") + Function function = this.lookup(functionNames[0]); + for (int i = 1; i < functionNames.length; i++) { + function = function.andThen(this.lookup(functionNames[i])); + } + this.register(composedFunctionName, function); + } }