GH-466 Fix SupplierExporter to avoid starting if Supplier is not present

Added condition to the start method of the SupplierExporter to prevent it from starting for cases where there are no Suppliers in catalog

Resolves #466
This commit is contained in:
Oleg Zhurakousky
2020-03-31 16:40:31 +02:00
parent 21d68ff3c8
commit 949f5fb023
2 changed files with 39 additions and 38 deletions

View File

@@ -243,23 +243,20 @@ class FunctionEndpointFactory {
})
.andRoute(GET("/**"), request -> {
Object functionComponent = extract(request);
if (functionComponent instanceof Supplier || functionComponent instanceof Function) {
Class<T> outputType = (Class<T>) this.inspector.getOutputType(functionComponent);
if (functionComponent instanceof Supplier) {
Supplier<? extends Flux<?>> supplier = (Supplier<Flux<?>>) functionComponent;
FunctionWrapper wrapper = RequestProcessor.wrapper(null, null, supplier);
return ServerResponse.ok().body(wrapper.supplier().get(), outputType);
}
else if (functionComponent instanceof Function) {
Function<Flux<?>, Flux<?>> function = (Function<Flux<?>, Flux<?>>) functionComponent;
FunctionWrapper wrapper = RequestProcessor.wrapper(function, null, null);
wrapper.headers(request.headers().asHttpHeaders());
String argument = (String) request.attribute(WebRequestConstants.ARGUMENT).get();
wrapper.argument(Flux.just(argument));
return ServerResponse.ok().body(wrapper.function().apply(wrapper.argument()), outputType);
}
Class<T> outputType = (Class<T>) this.inspector.getOutputType(functionComponent);
if (functionComponent instanceof Supplier) {
Supplier<? extends Flux<?>> supplier = (Supplier<Flux<?>>) functionComponent;
FunctionWrapper wrapper = RequestProcessor.wrapper(null, null, supplier);
return ServerResponse.ok().body(wrapper.supplier().get(), outputType);
}
else {
Function<Flux<?>, Flux<?>> function = (Function<Flux<?>, Flux<?>>) functionComponent;
FunctionWrapper wrapper = RequestProcessor.wrapper(function, null, null);
wrapper.headers(request.headers().asHttpHeaders());
String argument = (String) request.attribute(WebRequestConstants.ARGUMENT).get();
wrapper.argument(Flux.just(argument));
return ServerResponse.ok().body(wrapper.function().apply(wrapper.argument()), outputType);
}
throw new UnsupportedOperationException("Consumer is not supported for GET");
});
}
}

View File

@@ -90,6 +90,8 @@ public class SupplierExporter implements SmartLifecycle {
Flux<Object> streams = Flux.empty();
Set<String> names = this.supplier == null ? this.catalog.getNames(Supplier.class)
: Collections.singleton(this.supplier);
boolean suppliersPresent = false;
for (String name : names) {
Supplier<Publisher<Object>> supplier = this.catalog.lookup(Supplier.class, name);
if (supplier == null) {
@@ -97,32 +99,34 @@ public class SupplierExporter implements SmartLifecycle {
continue;
}
streams = streams.mergeWith(forward(supplier, name));
suppliersPresent = true;
}
this.subscription = streams
.retry(error -> {
/*
* The ConnectException may happen if a server is not yet available/reachable
* The ClassCast is to handle delayed Mono issued by HttpSupplier.transform for non-2xx responses
*/
boolean retry = error instanceof ConnectException || error instanceof ClassCastException
&& this.running;
if (!retry) {
this.ok = false;
if (!this.debug) {
logger.info(error);
if (suppliersPresent) {
this.subscription = streams
.retry(error -> {
/*
* The ConnectException may happen if a server is not yet available/reachable
* The ClassCast is to handle delayed Mono issued by HttpSupplier.transform for non-2xx responses
*/
boolean retry = error instanceof ConnectException || error instanceof ClassCastException
&& this.running;
if (!retry) {
this.ok = false;
if (!this.debug) {
logger.info(error);
}
stop();
}
return retry;
})
.doOnComplete(() -> {
stop();
}
return retry;
})
.doOnComplete(() -> {
stop();
})
.subscribe();
})
.subscribe();
this.ok = true;
this.running = true;
this.ok = true;
this.running = true;
}
}
public boolean isOk() {