Consolidated all function wrappers around WrappedFunction

This commit is contained in:
Oleg Zhurakousky
2019-02-11 19:08:52 +01:00
parent f8e966f79f
commit b076f6349e
3 changed files with 16 additions and 30 deletions

View File

@@ -36,6 +36,7 @@ import org.springframework.cloud.function.core.FluxFunction;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.FluxedConsumer;
import org.springframework.cloud.function.core.FluxedFunction;
import org.springframework.cloud.function.core.MonoToFluxFunction;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -171,7 +172,7 @@ public class FunctionRegistration<T> implements BeanNameAware {
target = (S) new FluxedConsumer((Consumer<?>) target);
}
else if (target instanceof Function) {
// target = (S) new FluxedFunction((Function<?, ?>) target);
target = (S) new FluxedFunction((Function<?, ?>) target);
}
result = result.target(target).names(this.names)

View File

@@ -22,7 +22,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Marker wrapper for target {@code Function<Flux<?>, Mono<?>>}.
* Wrapper to mark function {@code Function<Flux<?>, Mono<?>>}.
*
* While it may look similar to {@link FluxedConsumer}
* the fundamental difference is that this class represents a function that
* returns {@link Mono} of type {@code <O>}, while {@link FluxedConsumer} is
* a consumer that has been decorated as {@code Function<Flux<?>, Mono<Void>>}.
*
* @param <I> type of {@link Flux} input of the target function
* @param <O> type of {@link Mono} output of the target function
@@ -30,25 +35,15 @@ import reactor.core.publisher.Mono;
* @since 2.0
*/
public class FluxToMonoFunction<I, O>
implements Function<Flux<I>, Mono<O>>, FluxWrapper<Function<Flux<I>, Mono<O>>> {
extends WrappedFunction<I, O, Flux<I>, Mono<O>, Function<Flux<I>, Mono<O>>> {
private final Function<Flux<I>, Mono<O>> function;
/**
* @param function target function
*/
public FluxToMonoFunction(Function<Flux<I>, Mono<O>> function) {
this.function = function;
}
@Override
public Function<Flux<I>, Mono<O>> getTarget() {
return this.function;
public FluxToMonoFunction(Function<Flux<I>, Mono<O>> target) {
super(target);
}
@Override
public Mono<O> apply(Flux<I> input) {
return this.function.apply(input);
return this.getTarget().apply(input);
}
}

View File

@@ -30,25 +30,15 @@ import reactor.core.publisher.Mono;
* @since 2.0
*/
public class MonoToFluxFunction<I, O>
implements Function<Mono<I>, Flux<O>>, FluxWrapper<Function<Mono<I>, Flux<O>>> {
extends WrappedFunction<I, O, Mono<I>, Flux<O>, Function<Mono<I>, Flux<O>>> {
private final Function<Mono<I>, Flux<O>> function;
/**
* @param function target function
*/
public MonoToFluxFunction(Function<Mono<I>, Flux<O>> function) {
this.function = function;
}
@Override
public Function<Mono<I>, Flux<O>> getTarget() {
return this.function;
public MonoToFluxFunction(Function<Mono<I>, Flux<O>> target) {
super(target);
}
@Override
public Flux<O> apply(Mono<I> input) {
return this.function.apply(input);
return this.getTarget().apply(input);
}
}