GH-243, GH-257 Added reactive consumer wrapper

- Added wrapper for an already reactive consumer to ensure that consumers can be consistently represented as Function<Flux, Mono>
- Fixed the big that deal with inconsistent result in web environments due to inconsistent representation of the Consumers
- Polished tests

Resolves #243
Resolves #257
This commit is contained in:
Oleg Zhurakousky
2019-02-11 15:30:04 +01:00
parent 660aebc4d9
commit 805b85b102
10 changed files with 175 additions and 69 deletions

View File

@@ -35,6 +35,7 @@ import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxFunction;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.FluxedConsumer;
import org.springframework.cloud.function.core.MonoToFluxFunction;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -164,6 +165,11 @@ public class FunctionRegistration<T> implements BeanNameAware {
target = (S) new FluxConsumer((Consumer<?>) target);
}
}
else {
if (target instanceof Consumer) {
target = (S) new FluxedConsumer((Consumer<?>) target);
}
}
if (Mono.class.isAssignableFrom(type.getOutputWrapper())) {
target = (S) new FluxToMonoFunction((Function) target);

View File

@@ -58,7 +58,6 @@ import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.catalog.FunctionRegistrationEvent;
import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent;
import org.springframework.cloud.function.core.FluxConsumer;
@@ -562,22 +561,22 @@ public class ContextFunctionCatalogAutoConfiguration {
}
protected class BeanFactoryFunctionInspector implements FunctionInspector {
private ContextFunctionRegistry processor;
public BeanFactoryFunctionInspector(ContextFunctionRegistry processor) {
this.processor = processor;
}
@Override
public FunctionRegistration<?> getRegistration(Object function) {
FunctionRegistration<?> registration = this.processor
.getRegistration(function);
return registration;
}
}
// protected class BeanFactoryFunctionInspector implements FunctionInspector {
//
// private ContextFunctionRegistry processor;
//
// public BeanFactoryFunctionInspector(ContextFunctionRegistry processor) {
// this.processor = processor;
// }
//
// @Override
// public FunctionRegistration<?> getRegistration(Object function) {
// FunctionRegistration<?> registration = this.processor
// .getRegistration(function);
// return registration;
// }
//
// }
@Configuration
@ConditionalOnClass(Gson.class)

View File

@@ -560,10 +560,10 @@ public class ContextFunctionCatalogAutoConfigurationTests {
"spring.cloud.function.compile.foos.lambda=f -> f.subscribe("
+ getClass().getName() + "::set)",
"spring.cloud.function.compile.foos.type=consumer");
assertThat((Consumer<?>) this.catalog.lookup(Consumer.class, "foos"))
.isInstanceOf(Consumer.class);
assertThat((Function<?, ?>) this.catalog.lookup(Function.class, "foos"))
.isInstanceOf(Function.class);
assertThat(this.inspector
.getInputWrapper(this.catalog.lookup(Consumer.class, "foos")))
.getInputWrapper(this.catalog.lookup(Function.class, "foos")))
.isEqualTo(Flux.class);
@SuppressWarnings("unchecked")
Consumer<Flux<String>> consumer = (Consumer<Flux<String>>) this.context

View File

@@ -17,35 +17,29 @@
package org.springframework.cloud.function.core;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Wrapper for a {@link Consumer} implementation that converts a non-reactive consumer
* into a reactive function.
* Wrapper for a {@link Consumer} implementation that converts a <i>non-reactive</i>
* consumer into a reactive function ({@code Function<Flux<?>, Mono<?>>}).
*
* @param <T> input type of target consumer
* @param <I> input type of target consumer
* @author Dave Syer
* @author Oleg Zhurakousky
* @see FluxedConsumer
*/
public class FluxConsumer<T>
implements Function<Flux<T>, Mono<Void>>, FluxWrapper<Consumer<T>> {
public class FluxConsumer<I>
extends WrappedFunction<I, Void, Flux<I>, Mono<Void>, Consumer<I>> {
private final Consumer<T> consumer;
public FluxConsumer(Consumer<T> consumer) {
this.consumer = consumer;
public FluxConsumer(Consumer<I> target) {
super(target);
}
@Override
public Consumer<T> getTarget() {
return this.consumer;
}
@Override
public Mono<Void> apply(Flux<T> input) {
return input.doOnNext(this.consumer).then();
public Mono<Void> apply(Flux<I> input) {
return input.doOnNext(this.getTarget()).then();
}
}

View File

@@ -27,24 +27,18 @@ import reactor.core.publisher.Flux;
* @param <T> input type of target function
* @param <R> output type of target function
* @author Mark Fisher
* @author Oleg Zhurakousky
*/
public class FluxFunction<T, R>
implements Function<Flux<T>, Flux<R>>, FluxWrapper<Function<T, R>> {
public class FluxFunction<I, O>
extends WrappedFunction<I, O, Flux<I>, Flux<O>, Function<I, O>> {
private final Function<T, R> function;
public FluxFunction(Function<T, R> function) {
this.function = function;
public FluxFunction(Function<I, O> target) {
super(target);
}
@Override
public Function<T, R> getTarget() {
return this.function;
}
@Override
public Flux<R> apply(Flux<T> input) {
return input.map(i -> this.function.apply(i));
public Flux<O> apply(Flux<I> input) {
return input.map(value -> this.getTarget().apply(value));
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.core;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Wrapper for a {@link Consumer} implementation that converts a reactive consumer into a
* reactive function ({@code Function<Flux<?>, Mono<?>>}). This is primarily done for
* consistent representation of reactive and non-reactive consumers.
*
* @param <I> input type of target consumer
* @author Oleg Zhurakousky
* @since 2.0.1
* @see FluxConsumer
*
*/
public class FluxedConsumer<I>
extends WrappedFunction<I, Void, Flux<I>, Mono<Void>, Consumer<Flux<I>>> {
public FluxedConsumer(Consumer<Flux<I>> target) {
super(target);
}
@Override
public Mono<Void> apply(Flux<I> input) {
return Mono.fromRunnable(() -> this.getTarget().accept(input));
}
}

View File

@@ -0,0 +1,52 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.core;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
/**
* Base class for all wrappers that represent underlying functions (user defined
* suppliers, functions and/or consumers) as reactive functions.
*
* @param <I> input type of target consumer
* @param <O> output type of target consumer
* @param <IP> reactive input type of target function (instance of {@link Publisher}
* @param <OP> reactive output type of target function (instance of {@link Publisher}
* @param <T> actual target function (instance of {@link Supplier}, {@link Function} or
* {@link Consumer})
* @author Oleg Zhurakousky
* @since 2.0.1
*/
abstract class WrappedFunction<I, O, IP extends Publisher<I>, OP extends Publisher<O>, T>
implements Function<IP, OP>, FluxWrapper<T> {
private final T target;
WrappedFunction(T target) {
this.target = target;
}
@Override
public T getTarget() {
return this.target;
}
}

View File

@@ -40,7 +40,9 @@ import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxWrapper;
import org.springframework.cloud.function.core.FluxedConsumer;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.core.MethodParameter;
@@ -200,7 +202,14 @@ public class RequestProcessor {
flux = messages(wrapper, function == null ? consumer : function, flux);
}
Mono<ResponseEntity<?>> responseEntityMono = null;
if (function != null) {
if (function instanceof FluxedConsumer || function instanceof FluxConsumer) {
((Mono<?>) function.apply(flux)).subscribe();
logger.debug("Handled POST with consumer");
responseEntityMono = Mono
.just(ResponseEntity.status(HttpStatus.ACCEPTED).build());
}
else {
Flux<?> result = Flux.from(function.apply(flux));
logger.debug("Handled POST with function");
if (stream) {
@@ -211,12 +220,6 @@ public class RequestProcessor {
body == null ? null : !(body instanceof Collection), false);
}
}
else if (consumer != null) {
consumer.accept(flux);
logger.debug("Handled POST with consumer");
responseEntityMono = Mono
.just(ResponseEntity.status(HttpStatus.ACCEPTED).build());
}
return responseEntityMono;
}

View File

@@ -124,14 +124,23 @@ public class HttpPostIntegrationTests {
assertThat(result.getBody()).isEqualTo(null);
}
@Test
public void addFoosFlux() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/addFoosFlux")).contentType(MediaType.APPLICATION_JSON)
.body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(this.test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo(null);
}
@Test
public void bareUpdates() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(this.test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[]");
}
@Test
@@ -399,12 +408,6 @@ public class HttpPostIntegrationTests {
};
}
// @Bean
// public Function<byte[],?> byteArrayInputFunction() {
//// return value -> new Foo(value.getValue().trim().toUpperCase());
// throw new UnsupportedOperationException("boom?");
// }
@Bean
public Function<Flux<Integer>, Flux<String>> wrap() {
return flux -> flux.log().map(value -> ".." + value + "..");
@@ -437,13 +440,22 @@ public class HttpPostIntegrationTests {
}
@Bean
public Consumer<Flux<Foo>> addFoos() {
public Consumer<Flux<Foo>> addFoosFlux() {
return flux -> flux.subscribe(value -> this.list.add(value.getValue()));
}
@Bean
public Consumer<Foo> addFoos() {
return value -> {
this.list.add(value.getValue());
};
}
@Bean
public Consumer<String> bareUpdates() {
return value -> this.list.add(value);
return value -> {
this.list.add(value);
};
}
@Bean("not/a")

View File

@@ -128,9 +128,8 @@ public class HttpPostIntegrationTests {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON)
.body("[\"one\",\"two\"]"), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
assertThat(this.test.list).hasSize(2);
assertThat(result.getBody()).isEqualTo("[]");
}
@Test