From 0cfb2b413fdbc41325e10a9ec07ba8de5713b993 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 8 Nov 2021 16:16:39 +0100 Subject: [PATCH] Initial round of deprecation rmovals --- .../context/FunctionRegistration.java | 30 -- .../AbstractComposableFunctionRegistry.java | 439 ------------------ .../context/catalog/MessageFunction.java | 106 ----- .../context/catalog/MessageSupplier.java | 63 --- .../context/config/FunctionContextUtils.java | 15 - .../context/message/MessageUtils.java | 7 - .../context/FunctionRegistrationTests.java | 58 --- .../context/catalog/MessageConsumerTests.java | 49 -- .../context/catalog/MessageFunctionTests.java | 113 ----- .../context/catalog/MessageSupplierTests.java | 79 ---- .../cloud/function/core/FluxConsumer.java | 48 -- .../cloud/function/core/FluxFunction.java | 47 -- .../cloud/function/core/FluxSupplier.java | 71 --- .../function/core/FluxToMonoFunction.java | 52 --- .../cloud/function/core/FluxWrapper.java | 30 -- .../cloud/function/core/FluxedConsumer.java | 50 -- .../cloud/function/core/FluxedFunction.java | 47 -- .../core/FunctionFactoryMetadata.java | 34 -- .../cloud/function/core/IsolatedSupplier.java | 57 --- .../cloud/function/core/MonoSupplier.java | 54 --- .../function/core/MonoToFluxFunction.java | 47 -- .../cloud/function/core/WrappedFunction.java | 55 --- 22 files changed, 1551 deletions(-) delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java delete mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/FunctionRegistrationTests.java delete mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java delete mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java delete mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryMetadata.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoSupplier.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java delete mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java index 536ce9ac2..c579273dd 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java @@ -35,14 +35,6 @@ import reactor.core.publisher.Mono; import org.springframework.beans.factory.BeanNameAware; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.config.RoutingFunction; -import org.springframework.cloud.function.core.FluxConsumer; -import org.springframework.cloud.function.core.FluxFunction; -import org.springframework.cloud.function.core.FluxSupplier; -import org.springframework.cloud.function.core.FluxToMonoFunction; -import org.springframework.cloud.function.core.FluxedConsumer; -import org.springframework.cloud.function.core.FluxedFunction; -import org.springframework.cloud.function.core.MonoSupplier; -import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -193,28 +185,6 @@ public class FunctionRegistration implements BeanNameAware { S target = (S) this.target; result = new FunctionRegistration(target); result.type(this.type.getType()); - - if (!this.type.isWrapper()) { - target = target instanceof Supplier - ? (S) new FluxSupplier((Supplier) target) - : target instanceof Function - ? (S) new FluxFunction((Function) target) - : (S) new FluxConsumer((Consumer) target); - } - else if (Mono.class.isAssignableFrom(this.type.getOutputWrapper())) { - target = target instanceof Supplier - ? (S) new MonoSupplier((Supplier) target) - : (S) new FluxToMonoFunction((Function) target); - } - else if (Mono.class.isAssignableFrom(this.type.getInputWrapper())) { - target = (S) new MonoToFluxFunction((Function) target); - } - else if (target instanceof Consumer) { - target = (S) new FluxedConsumer((Consumer) target); - } - else if (target instanceof Function) { - target = (S) new FluxedFunction((Function) target); - } result = result.target(target).names(this.names) .type(result.type.wrap(Flux.class)).properties(this.properties); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java deleted file mode 100644 index f0b934c9f..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog; - -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import reactor.core.publisher.Flux; - -import org.springframework.cloud.function.context.FunctionRegistration; -import org.springframework.cloud.function.context.FunctionRegistry; -import org.springframework.cloud.function.context.FunctionType; -import org.springframework.cloud.function.context.config.RoutingFunction; -import org.springframework.cloud.function.core.FluxToMonoFunction; -import org.springframework.cloud.function.core.IsolatedConsumer; -import org.springframework.cloud.function.core.IsolatedFunction; -import org.springframework.cloud.function.core.IsolatedSupplier; -import org.springframework.cloud.function.core.MonoToFluxFunction; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.EnvironmentAware; -import org.springframework.core.env.Environment; -import org.springframework.core.env.StandardEnvironment; -import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; - -/** - * Base implementation of {@link FunctionRegistry} which supports function composition - * during lookups. For example if this registry contains function 'a' and 'b' you can - * compose them into a single function by simply piping two names together during the - * lookup {@code this.lookup(Function.class, "a|b")}. - * - * Comma ',' is also supported as composition delimiter (e.g., {@code "a,b"}). - * - * @author Oleg Zhurakousky - * @author Dave Syer - * @since 2.1 - * - */ -public abstract class AbstractComposableFunctionRegistry implements FunctionRegistry, - ApplicationEventPublisherAware, EnvironmentAware { - - private final Map functions = new ConcurrentHashMap<>(); - - private final Map names = new ConcurrentHashMap<>(); - - private final Map types = new ConcurrentHashMap<>(); - - private Environment environment = new StandardEnvironment(); - - protected ApplicationEventPublisher applicationEventPublisher; - - @SuppressWarnings("unchecked") - @Override - public T lookup(Class type, String name) { - String functionDefinitionName = !StringUtils.hasText(name) - && this.environment.containsProperty("spring.cloud.function.definition") - ? this.environment.getProperty("spring.cloud.function.definition") - : name; - return (T) this.doLookup(type, functionDefinitionName); - } - - @SuppressWarnings("serial") - @Override - public Set getNames(Class type) { - if (type == null) { - return new HashSet(getSupplierNames()) { - { - addAll(getFunctionNames()); - } - }; - } - if (Supplier.class.isAssignableFrom(type)) { - return this.getSupplierNames(); - } - if (Function.class.isAssignableFrom(type)) { - return this.getFunctionNames(); - } - return Collections.emptySet(); - } - - /** - * Returns the names of available Suppliers. - * @return immutable {@link Set} of available {@link Supplier} names. - */ - public Set getSupplierNames() { - return this.functions.entrySet().stream() - .filter(entry -> entry.getValue() instanceof Supplier) - .map(entry -> entry.getKey()) - .collect(Collectors.toSet()); - } - - /** - * Returns the names of available Functions. - * @return immutable {@link Set} of available {@link Function} names. - */ - public Set getFunctionNames() { - return this.functions.entrySet().stream() - .filter(entry -> !(entry.getValue() instanceof Supplier)) - .map(entry -> entry.getKey()) - .collect(Collectors.toSet()); - } - - public boolean hasSuppliers() { - return !CollectionUtils.isEmpty(getSupplierNames()); - } - - public boolean hasFunctions() { - return !CollectionUtils.isEmpty(getFunctionNames()); - } - - /** - * The size of this catalog, which is the count of all Suppliers, - * Function and Consumers currently registered. - * - * @return the count of all Suppliers, Function and Consumers currently registered. - */ - @Override - public int size() { - return this.functions.size(); - } - - public FunctionType getFunctionType(String name) { - return this.types.get(name); - } - - /** - * A reverse lookup where one can determine the actual name of the function reference. - * @param function should be an instance of {@link Supplier}, {@link Function} or - * {@link Consumer}; - * @return the name of the function or null. - */ - public String lookupFunctionName(Object function) { - return this.names.containsKey(function) ? this.names.get(function) : null; - } - - @Override - public void setApplicationEventPublisher( - ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; - } - - @Override - public void setEnvironment(Environment environment) { - this.environment = environment; - } - - - public FunctionRegistration getRegistration(Object function) { - String functionName = function == null ? null - : this.lookupFunctionName(function); - if (StringUtils.hasText(functionName)) { - FunctionRegistration registration = new FunctionRegistration( - function, functionName); - FunctionType functionType = this.findType(registration, functionName); - return registration.type(functionType.getType()); - } - return null; - } - - @Override - public void register(FunctionRegistration functionRegistration) { - Assert.notEmpty(functionRegistration.getNames(), - "'registration' must contain at least one name before it is registered in catalog."); - register(functionRegistration, functionRegistration.getNames().iterator().next()); - } - - - - /** - * Registers function wrapped by the provided FunctionRegistration with - * this FunctionRegistry. - * - * @param registration instance of {@link FunctionRegistration} - * @param key the name of the function - */ - protected void register(FunctionRegistration registration, String key) { - Object target = registration.getTarget(); - if (registration.getType() != null) { - this.addType(key, registration.getType()); - } - else { - FunctionType functionType = findType(registration, key); - if (functionType == null) { - return; // TODO fixme - } - this.addType(key, functionType); - registration.type(functionType.getType()); - } - Class type; - registration = isolated(registration).wrap(); - target = registration.getTarget(); - if (target instanceof Supplier) { - type = Supplier.class; - for (String name : registration.getNames()) { - this.addSupplier(name, (Supplier) registration.getTarget()); - } - } - else if (target instanceof Function) { - type = Function.class; - for (String name : registration.getNames()) { - this.addFunction(name, (Function) registration.getTarget()); - } - } - else { - return; - } - this.addName(registration.getTarget(), key); - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new FunctionRegistrationEvent( - registration.getTarget(), type, registration.getNames())); - } - } - - protected FunctionType findType(FunctionRegistration functionRegistration, String name) { - return functionRegistration.getType() != null - ? functionRegistration.getType() - : this.getFunctionType(name); - } - - - protected void addSupplier(String name, Supplier supplier) { - this.functions.put(name, supplier); - } - - protected void addFunction(String name, Function function) { - this.functions.put(name, function); - } - - protected void addType(String name, FunctionType functionType) { - this.types.computeIfAbsent(name, str -> functionType); - } - - protected void addName(Object function, String name) { - this.names.put(function, name); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private FunctionRegistration isolated(FunctionRegistration input) { - FunctionRegistration registration = (FunctionRegistration) input; - Object target = registration.getTarget(); - boolean isolated = getClass().getClassLoader() != target.getClass() - .getClassLoader(); - if (isolated) { - if (target instanceof Supplier && isolated) { - target = new IsolatedSupplier((Supplier) target); - } - else if (target instanceof Function) { - target = new IsolatedFunction((Function) target); - } - else if (target instanceof Consumer) { - target = new IsolatedConsumer((Consumer) target); - } - } - - registration.target(target); - return registration; - } - - private Object compose(String name, Map lookup) { - - name = name.replaceAll(",", "|").trim(); - Object composedFunction = null; - - if (lookup.containsKey(name)) { - composedFunction = lookup.get(name); - } - else if (name.equals("") && lookup.size() >= 1 && lookup.size() <= 2) { // we may have RoutingFunction function - String functionName = lookup.keySet().stream() - .filter(fName -> !fName.equals(RoutingFunction.FUNCTION_NAME)) - .findFirst().orElseGet(() -> null); - composedFunction = lookup.get(functionName); - } - else { - String[] stages = StringUtils.delimitedListToStringArray(name, "|"); - - AtomicBoolean supplierPresent = new AtomicBoolean(); - List> composableFunctions = Stream.of(stages) - .map(funcName -> find(funcName, supplierPresent.get())) - .filter(x -> x != null) - .peek(f -> supplierPresent.set(f.getTarget() instanceof Supplier)) - .collect(Collectors.toList()); - FunctionRegistration composedRegistration = composableFunctions - .stream().reduce((a, z) -> composeFunctions(a, z)) - .orElseGet(() -> null); - - if (composedRegistration != null - && composedRegistration.getTarget() != null - && !this.types.containsKey(name)) { - - composedFunction = composedRegistration.getTarget(); - this.addType(name, composedRegistration.getType()); - this.addName(composedFunction, name); - if (composedFunction instanceof Function || composedFunction instanceof Consumer) { - this.addFunction(name, (Function) composedFunction); - } - else if (composedFunction instanceof Supplier) { - this.addSupplier(name, (Supplier) composedFunction); - } - } - - } - - return composedFunction; - } - - private FunctionRegistration find(String name, boolean supplierFound) { - Object result = this.functions.get(name); - if (result == null && !StringUtils.hasText(name)) { - if (supplierFound && this.getFunctionNames().size() == 1) { - result = this.functions.get(this.getFunctionNames().iterator().next()); - } - else if (!supplierFound && this.getSupplierNames().size() == 1) { - result = this.functions.get(this.getSupplierNames().iterator().next()); - } - } - - return getRegistration(result); - } - - @SuppressWarnings("unchecked") - private FunctionRegistration composeFunctions(FunctionRegistration aReg, - FunctionRegistration bReg) { - FunctionType aType = aReg.getType(); - FunctionType bType = bReg.getType(); - Object a = aReg.getTarget(); - Object b = bReg.getTarget(); - if (aType != null && bType != null) { - if (aType.isMessage() && !bType.isMessage()) { - bType = bType.message(); - b = message(b); - } - } - Object composedFunction = null; -// if (a instanceof Supplier && b instanceof Function) { -// Supplier> supplier = (Supplier>) a; -// if (b instanceof FluxConsumer) { -// if (supplier instanceof FluxSupplier) { -// FluxConsumer fConsumer = ((FluxConsumer) b); -// composedFunction = (Supplier>) () -> Mono.from( -// supplier.get().compose(v -> fConsumer.apply(supplier.get()))); -// } -// else { -// throw new IllegalStateException( -// "The provided supplier is finite (i.e., already composed with Consumer) " -// + "therefore it can not be composed with another consumer"); -// } -// } -// else { -// Function function = (Function) b; -// composedFunction = (Supplier) () -> function -// .apply(supplier.get()); -// } -// } -// else - if (a instanceof Function && b instanceof Function) { - Function function1 = (Function) a; - Function function2 = (Function) b; - if (function1 instanceof FluxToMonoFunction) { - if (function2 instanceof MonoToFluxFunction) { - composedFunction = function1.andThen(function2); - } - else { - throw new IllegalStateException( - "The provided function is finite (i.e., returns Mono) " - + "therefore it can *only* be composed with compatible function (i.e., Function"); - } - } - else if (function2 instanceof FluxToMonoFunction) { - composedFunction = new FluxToMonoFunction( - ((Function, Flux>) a).andThen( - ((FluxToMonoFunction) b).getTarget())); - } - else { - composedFunction = function1.andThen(function2); - } - } - else if (a instanceof Function && b instanceof Consumer) { - Function function = (Function) a; - Consumer consumer = (Consumer) b; - composedFunction = (Consumer) v -> consumer.accept(function.apply(v)); - } - else { - throw new IllegalArgumentException(String - .format("Could not compose %s and %s", a.getClass(), b.getClass())); - } - String name = aReg.getNames().iterator().next() + "|" - + bReg.getNames().iterator().next(); - return new FunctionRegistration<>(composedFunction, name) - .type(FunctionType.compose(aType, bType)); - } - - private Object message(Object input) { - if (input instanceof Supplier) { - return new MessageSupplier((Supplier) input); - } - if (input instanceof Consumer) { - return new MessageConsumer((Consumer) input); - } - if (input instanceof Function) { - return new MessageFunction((Function) input); - } - return input; - } - - private Object doLookup(Class type, String name) { - Object function = this.compose(name, this.functions); - if (function != null && type != null && !type.isAssignableFrom(function.getClass())) { - function = null; - } - return function; - } - -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java deleted file mode 100644 index 6650e07d0..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import org.springframework.cloud.function.core.FluxConsumer; -import org.springframework.cloud.function.core.FluxFunction; -import org.springframework.cloud.function.core.FluxToMonoFunction; -import org.springframework.cloud.function.core.FluxedFunction; -import org.springframework.cloud.function.core.MonoToFluxFunction; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.MessageBuilder; - -/** - * @author Dave Syer - * @since 2.1 - */ -public class MessageFunction - implements Function, Publisher>> { - - private final Function delegate; - - public MessageFunction(Function delegate) { - this.delegate = delegate; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public Publisher> apply(Publisher input) { - Flux incomingFlux = Flux.from(input); - Flux> flux = incomingFlux.map(value -> { - if (!(value instanceof Message)) { - return MessageBuilder.withPayload(value).build(); - } - return (Message) value; - }); - - if (this.delegate instanceof FluxFunction) { - Function target = (Function) ((FluxFunction) this.delegate) - .getTarget(); - return flux.map( - value -> MessageBuilder.withPayload(target.apply(value.getPayload())) - .copyHeaders(value.getHeaders()).build()); - } - if (this.delegate instanceof MonoToFluxFunction) { - Function, Flux> target = ((MonoToFluxFunction) this.delegate) - .getTarget(); - return flux.next() - .flatMapMany(value -> target.apply(Mono.just(value.getPayload())) - .map(object -> MessageBuilder.withPayload(object) - .copyHeaders(value.getHeaders()).build())); - } - if (this.delegate instanceof FluxToMonoFunction) { - Function, Mono> target = ((FluxToMonoFunction) this.delegate) - .getTarget(); - AtomicReference headers = new AtomicReference<>(); - return target.apply(flux.map(messsage -> { - headers.set(messsage.getHeaders()); - return messsage.getPayload(); - })).map(payload -> MessageBuilder.withPayload(payload) - .copyHeaders(headers.get()).build()); - } - if (this.delegate instanceof FluxConsumer) { - FluxConsumer target = ((FluxConsumer) this.delegate); - AtomicReference headers = new AtomicReference<>(); - Mono mapped = target.apply(flux.map(messsage -> { - headers.set(messsage.getHeaders()); - return messsage.getPayload(); - })); - return mapped.map(value -> MessageBuilder.createMessage(null, headers.get())); - } - - // TODO: cover the case that delegate is actually Function - if (this.delegate instanceof FluxedFunction) { - Function, Flux> target = ((FluxedFunction) this.delegate); - return (Flux) flux.map(value -> ((Message) value).getPayload()).transform(target); - } - Function function = this.delegate; - return flux.map( - value -> { - return MessageBuilder.withPayload(function.apply(value.getPayload())) - .copyHeaders(value.getHeaders()).build(); - }); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java deleted file mode 100644 index 999a3271b..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageSupplier.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog; - -import java.util.function.Supplier; - -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import org.springframework.cloud.function.core.FluxSupplier; -import org.springframework.cloud.function.core.MonoSupplier; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; - -/** - * @author Dave Syer - */ -public class MessageSupplier implements Supplier>> { - - private Supplier delegate; - - public MessageSupplier(Supplier delegate) { - this.delegate = delegate; - } - - @Override - public Publisher> get() { - if (this.delegate instanceof FluxSupplier) { - return ((Flux) this.delegate.get()) - .map(value -> MessageBuilder.withPayload(value).build()); - } - if (this.delegate instanceof MonoSupplier) { - return ((Mono) this.delegate.get()) - .map(value -> MessageBuilder.withPayload(value).build()); - } - Object product = this.delegate.get(); - if (product instanceof Publisher) { - return Flux.from((Publisher) product) - .map(value -> MessageBuilder.withPayload(value).build()); - } - if (product instanceof Iterable) { - return Flux.fromIterable((Iterable) product) - .map(value -> MessageBuilder.withPayload(value).build()); - } - return Mono.just(MessageBuilder.withPayload(product).build()); - } - -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FunctionContextUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FunctionContextUtils.java index 1665b9d81..af3313f93 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FunctionContextUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/FunctionContextUtils.java @@ -29,7 +29,6 @@ import org.springframework.beans.factory.config.ConstructorArgumentValues; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; -import org.springframework.cloud.function.core.FunctionFactoryMetadata; import org.springframework.context.annotation.ScannedGenericBeanDefinition; import org.springframework.core.ResolvableType; import org.springframework.core.io.Resource; @@ -84,20 +83,6 @@ public abstract class FunctionContextUtils { if (type != null) { param = type.getType(); } - else { - Class beanClass = definition.hasBeanClass() ? definition.getBeanClass() : null; - if (beanClass != null - && !FunctionFactoryMetadata.class.isAssignableFrom(beanClass)) { - param = beanClass; - } - else { - Object bean = registry.getBean(actualName); - // could be FunctionFactoryMetadata. . . TODO investigate and fix - if (bean instanceof FunctionFactoryMetadata) { - param = ((FunctionFactoryMetadata) bean).getFactoryMethod().getGenericReturnType(); - } - } - } } return param; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java index 2c6b3c7bd..6b7aa1140 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/message/MessageUtils.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.springframework.cloud.function.core.FluxWrapper; import org.springframework.cloud.function.core.Isolated; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -61,9 +60,6 @@ public abstract class MessageUtils { */ public static Object create(Object handler, Object payload, Map headers) { - if (handler instanceof FluxWrapper) { - handler = ((FluxWrapper) handler).getTarget(); - } if (payload instanceof Message) { headers = new HashMap<>(headers); headers.putAll(((Message) payload).getHeaders()); @@ -93,9 +89,6 @@ public abstract class MessageUtils { * @return a message with the correct class loader */ public static Message unpack(Object handler, Object message) { - if (handler instanceof FluxWrapper) { - handler = ((FluxWrapper) handler).getTarget(); - } if (!(handler instanceof Isolated)) { if (message instanceof Message) { return (Message) message; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/FunctionRegistrationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/FunctionRegistrationTests.java deleted file mode 100644 index 10f755f55..000000000 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/FunctionRegistrationTests.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context; - -import java.util.function.Function; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Dave Syer - * - */ -public class FunctionRegistrationTests { - - @Test - public void noTypeByDefault() { - FunctionRegistration registration = new FunctionRegistration<>(new Foos(), - "foos"); - assertThat(registration.getType()).isNull(); - assertThat(registration.getNames()).contains("foos"); - } - - @Test - public void wrap() { - FunctionRegistration registration = new FunctionRegistration<>(new Foos(), - "foos").type(FunctionType.of(Foos.class).getType()); - FunctionRegistration other = registration.wrap(); - assertThat(registration.getType().isWrapper()).isFalse(); - assertThat(other.getType().isWrapper()).isTrue(); - assertThat(other.getTarget()).isNotEqualTo(registration.getTarget()); - } - - private static class Foos implements Function { - - @Override - public String apply(Integer t) { - return "i=" + t; - } - - } - -} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java deleted file mode 100644 index 08221e709..000000000 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageConsumerTests.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog; - -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; - -import org.springframework.messaging.support.MessageBuilder; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Dave Syer - */ -public class MessageConsumerTests { - - private List items = new ArrayList<>(); - - @Test - public void plainConsumer() { - MessageConsumer consumer = new MessageConsumer(input()); - consumer.accept(Flux - .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); - assertThat(this.items).hasSize(1); - } - - private Consumer input() { - return value -> this.items.add(value); - } - -} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java deleted file mode 100644 index 54094ac64..000000000 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageFunctionTests.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog; - -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.junit.jupiter.api.Test; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; - -import org.springframework.cloud.function.core.FluxConsumer; -import org.springframework.cloud.function.core.FluxFunction; -import org.springframework.cloud.function.core.FluxToMonoFunction; -import org.springframework.cloud.function.core.MonoToFluxFunction; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Dave Syer - */ -public class MessageFunctionTests { - - private List items = new ArrayList<>(); - - @Test - public void plainFunction() { - MessageFunction function = new MessageFunction(uppercase()); - Publisher> result = function.apply(Flux - .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); - StepVerifier.create(result).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("FOO"); - assertThat(message.getHeaders()).containsEntry("foo", "bar"); - }); - } - - @Test - public void fluxFunction() { - MessageFunction function = new MessageFunction(new FluxFunction<>(uppercase())); - Publisher> result = function.apply(Flux - .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); - StepVerifier.create(result).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("FOO"); - assertThat(message.getHeaders()).containsEntry("foo", "bar"); - }); - } - - @Test - public void fluxToMonoFunction() { - MessageFunction function = new MessageFunction( - new FluxToMonoFunction( - flux -> flux.next().map(uppercase()))); - Publisher> result = function.apply(Flux - .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); - StepVerifier.create(result).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("FOO"); - assertThat(message.getHeaders()).containsEntry("foo", "bar"); - }); - } - - @Test - public void monoToFunction() { - MessageFunction function = new MessageFunction( - new MonoToFluxFunction( - mono -> Flux.from(mono.map(uppercase())))); - Publisher> result = function.apply(Flux - .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); - StepVerifier.create(result).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("FOO"); - assertThat(message.getHeaders()).containsEntry("foo", "bar"); - }); - } - - @Test - public void fluxConsumer() { - MessageFunction function = new MessageFunction(new FluxConsumer<>(stash())); - Publisher> result = function.apply(Flux - .just(MessageBuilder.withPayload("foo").setHeader("foo", "bar").build())); - StepVerifier.create(result).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo(null); - assertThat(message.getHeaders()).containsEntry("foo", "bar"); - assertThat(this.items).hasSize(1); - }); - } - - private Consumer stash() { - return value -> this.items.add(value); - } - - private Function uppercase() { - return value -> value.toUpperCase(); - } - -} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java deleted file mode 100644 index 8ff2bd6be..000000000 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/MessageSupplierTests.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog; - -import java.util.Arrays; -import java.util.Collection; -import java.util.function.Supplier; - -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Dave Syer - */ -public class MessageSupplierTests { - - @Test - public void plainSupplier() { - MessageSupplier supplier = new MessageSupplier(input()); - StepVerifier.create(supplier.get()).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("foo"); - assertThat(message.getHeaders()).isEmpty(); - }); - } - - @Test - public void collectionSupplier() { - MessageSupplier supplier = new MessageSupplier(inputs()); - StepVerifier.create(supplier.get()).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("foo"); - assertThat(message.getHeaders()).isEmpty(); - }).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("bar"); - assertThat(message.getHeaders()).isEmpty(); - }); - } - - @Test - public void fluxSupplier() { - MessageSupplier supplier = new MessageSupplier(flux()); - StepVerifier.create(supplier.get()).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("foo"); - assertThat(message.getHeaders()).isEmpty(); - }).assertNext(message -> { - assertThat(message.getPayload()).isEqualTo("bar"); - assertThat(message.getHeaders()).isEmpty(); - }); - } - - private Supplier input() { - return () -> "foo"; - } - - private Supplier> inputs() { - return () -> Arrays.asList("foo", "bar"); - } - - private Supplier> flux() { - return () -> Flux.just("foo", "bar"); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java deleted file mode 100644 index 78e64f2e8..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Consumer; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * Wrapper for a {@link Consumer} implementation that converts a non-reactive - * consumer into a reactive function ({@code Function, Mono>}). - * - * @param input type of target consumer - * @author Dave Syer - * @author Oleg Zhurakousky - * @see FluxedConsumer - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class FluxConsumer - extends WrappedFunction, Mono, Consumer> { - - public FluxConsumer(Consumer target) { - super(target); - } - - @Override - public Mono apply(Flux input) { - return input.doOnNext(this.getTarget()).then(); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java deleted file mode 100644 index 74933963e..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Function; - -import reactor.core.publisher.Flux; - -/** - * {@link Function} implementation that wraps a target Function so that the target's - * simple input and output types will be wrapped as {@link Flux} instances. - * - * @param input type of target function - * @param output type of target function - * @author Mark Fisher - * @author Oleg Zhurakousky - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class FluxFunction - extends WrappedFunction, Flux, Function> { - - public FluxFunction(Function target) { - super(target); - } - - @Override - public Flux apply(Flux input) { - return input.map(value -> this.getTarget().apply(value)); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java deleted file mode 100644 index 8d28038a9..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxSupplier.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.time.Duration; -import java.util.function.Supplier; -import java.util.stream.Stream; - -import reactor.core.publisher.Flux; - -/** - * {@link Supplier} implementation that wraps a target Supplier so that the target's - * simple output type will be wrapped in a {@link Flux} instance. If a {@link Duration} is - * provided, the Flux will produce output periodically, invoking the target Supplier's - * {@code get} method at each interval. If no Duration is provided, the target will be - * invoked only once. - * - * @param output type of target supplier - * @author Mark Fisher - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class FluxSupplier implements Supplier>, FluxWrapper> { - - private final Supplier supplier; - - private final Duration period; - - public FluxSupplier(Supplier supplier) { - this(supplier, null); - } - - public FluxSupplier(Supplier supplier, Duration period) { - this.supplier = supplier; - this.period = period; - } - - @Override - public Supplier getTarget() { - return this.supplier; - } - - @Override - @SuppressWarnings({ "unchecked", "rawtypes" }) - public Flux get() { - if (this.period != null) { - return Flux.interval(this.period).map(i -> this.supplier.get()); - } - Object result = this.supplier.get(); - if (result instanceof Stream) { - return Flux.fromStream((Stream) result); - } - return Flux.just((T) result); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java deleted file mode 100644 index a923d1823..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Function; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * Wrapper to mark function {@code Function, Mono>}. - * - * While it may look similar to {@link FluxedConsumer} the fundamental difference is that - * this class represents a function that returns {@link Mono} of type {@code }, while - * {@link FluxedConsumer} is a consumer that has been decorated as - * {@code Function, Mono>}. - * - * @param type of {@link Flux} input of the target function - * @param type of {@link Mono} output of the target function - * @author Oleg Zhurakousky - * @since 2.0 - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class FluxToMonoFunction - extends WrappedFunction, Mono, Function, Mono>> { - - public FluxToMonoFunction(Function, Mono> target) { - super(target); - } - - @Override - public Mono apply(Flux input) { - return this.getTarget().apply(input); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java deleted file mode 100644 index 524e19a0c..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxWrapper.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -/** - * @param target type - * @author Dave Syer - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public interface FluxWrapper { - - T getTarget(); - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java deleted file mode 100644 index 227b9e017..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Consumer; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * Wrapper for a {@link Consumer} implementation that converts a reactive consumer into a - * reactive function ({@code Function, Mono>}). This is primarily done for - * consistent representation of reactive and non-reactive consumers. - * - * @param input type of target consumer - * @author Oleg Zhurakousky - * @since 2.0.1 - * @see FluxConsumer - * - * @deprecated since 3.1 no longer used by the framework - * - */ -@Deprecated -public class FluxedConsumer - extends WrappedFunction, Mono, Consumer>> { - - public FluxedConsumer(Consumer> target) { - super(target); - } - - @Override - public Mono apply(Flux input) { - return Mono.fromRunnable(() -> this.getTarget().accept(input)); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java deleted file mode 100644 index 1808c5993..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Function; - -import reactor.core.publisher.Flux; - -/** - * {@link Function} implementation that wraps a target Function so that the target's - * simple input and output types will be wrapped as {@link Flux} instances. - * - * @param input type of target function - * @param output type of target function - * @author Oleg Zhurakousky - * @since 2.0.1 - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class FluxedFunction - extends WrappedFunction, Flux, Function, Flux>> { - - public FluxedFunction(Function, Flux> target) { - super(target); - } - - @Override - public Flux apply(Flux input) { - return input.transform(this.getTarget()); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryMetadata.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryMetadata.java deleted file mode 100644 index 4f5f0b10d..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FunctionFactoryMetadata.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.lang.reflect.Method; - -/** - * @param target type - * @author Dave Syer - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public interface FunctionFactoryMetadata { - - Method getFactoryMethod(); - - F getTarget(); - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java deleted file mode 100644 index 7c9c8ba07..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/IsolatedSupplier.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Supplier; - -import org.springframework.util.ClassUtils; - -/** - * @param supplied type - * @author Dave Syer - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class IsolatedSupplier implements Supplier, Isolated { - - private final Supplier supplier; - - private final ClassLoader classLoader; - - public IsolatedSupplier(Supplier supplier) { - this.supplier = supplier; - this.classLoader = supplier.getClass().getClassLoader(); - } - - @Override - public ClassLoader getClassLoader() { - return this.classLoader; - } - - @Override - public T get() { - ClassLoader context = ClassUtils - .overrideThreadContextClassLoader(this.classLoader); - try { - return this.supplier.get(); - } - finally { - ClassUtils.overrideThreadContextClassLoader(context); - } - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoSupplier.java deleted file mode 100644 index d1a5d9222..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoSupplier.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Supplier; - -import reactor.core.publisher.Mono; - -/** - * {@link Supplier} implementation that wraps a target Supplier so that the target's - * simple output type will be wrapped in a {@link Mono} instance. - * - * @param output type of target supplier - * @author Mark Fisher - * @since 2.1 - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class MonoSupplier implements Supplier>, FluxWrapper> { - - private final Supplier supplier; - - public MonoSupplier(Supplier supplier) { - this.supplier = supplier; - } - - @Override - public Supplier getTarget() { - return this.supplier; - } - - @Override - @SuppressWarnings("unchecked") - public Mono get() { - Object result = this.supplier.get(); - return Mono.just((T) result); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java deleted file mode 100644 index e0cf7f2c4..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Function; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -/** - * Marker wrapper for target Function<Mono, Flux>. - * - * @param type of {@link Mono} input of the target function - * @param type of {@link Flux} output of the target function - * @author Oleg Zhurakousky - * @since 2.0 - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public class MonoToFluxFunction - extends WrappedFunction, Flux, Function, Flux>> { - - public MonoToFluxFunction(Function, Flux> target) { - super(target); - } - - @Override - public Flux apply(Mono input) { - return this.getTarget().apply(input); - } - -} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java deleted file mode 100644 index 65b762b7d..000000000 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.core; - -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.reactivestreams.Publisher; - -/** - * Base class for all wrappers that represent underlying functions (user defined - * suppliers, functions and/or consumers) as reactive functions. - * - * @param input type of target consumer - * @param output type of target consumer - * @param reactive input type of target function (instance of {@link Publisher} - * @param reactive output type of target function (instance of {@link Publisher} - * @param actual target function (instance of {@link Supplier}, {@link Function} or - * {@link Consumer}) - * @author Oleg Zhurakousky - * @since 2.0.1 - * - * @deprecated since 3.1 no longer used by the framework - */ -@Deprecated -public abstract class WrappedFunction, OP extends Publisher, T> - implements Function, FluxWrapper { - - private final T target; - - WrappedFunction(T target) { - this.target = target; - } - - @Override - public T getTarget() { - return this.target; - } - -}