Enforce Checkstyle failures

* Fix `FileSupplierConfiguration` to use `toReactivePublisher(true)`
to defer a start of channel adapters until reactive subscription.
* Also use `IntegrationReactiveUtils.messageSourceToFlux()` API instead of manual Reactor API composition
This commit is contained in:
Artem Bilan
2024-01-16 17:57:38 -05:00
parent 0f65110e89
commit 494311bcdd
2 changed files with 8 additions and 20 deletions

View File

@@ -114,7 +114,6 @@ configure(javaProjects) { subproject ->
checkstyle {
toolVersion = '10.3'
configDirectory = rootProject.file('etc/checkstyle')
ignoreFailures = true
}
// dependencies that are common across all java projects

View File

@@ -21,12 +21,9 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -35,7 +32,6 @@ import org.springframework.cloud.fn.common.file.FileConsumerProperties;
import org.springframework.cloud.fn.common.file.FileReadingMode;
import org.springframework.cloud.fn.common.file.FileUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.file.FileReadingMessageSource;
@@ -47,6 +43,7 @@ import org.springframework.integration.file.filters.FileSystemPersistentAcceptOn
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.StringUtils;
@@ -67,11 +64,6 @@ public class FileSupplierConfiguration {
private final FileConsumerProperties fileConsumerProperties;
@Autowired
@Lazy
@Qualifier("fileMessageSource")
private FileReadingMessageSource fileMessageSource;
public FileSupplierConfiguration(FileSupplierProperties fileSupplierProperties,
FileConsumerProperties fileConsumerProperties) {
@@ -110,21 +102,18 @@ public class FileSupplierConfiguration {
}
@Bean
public Flux<Message<?>> fileMessageFlux() {
return Mono
.<Message<?>>create(
(monoSink) -> monoSink.onRequest((value) -> monoSink.success(this.fileMessageSource.receive())))
.subscribeOn(Schedulers.boundedElastic())
.repeatWhenEmpty((it) -> it.delayElements(this.fileSupplierProperties.getDelayWhenEmpty()))
.repeat()
.doOnRequest((r) -> this.fileMessageSource.start());
public Flux<Message<File>> fileMessageFlux(FileReadingMessageSource fileReadingMessageSource) {
return IntegrationReactiveUtils.messageSourceToFlux(fileReadingMessageSource)
.contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY,
this.fileSupplierProperties.getDelayWhenEmpty()))
.doOnRequest((r) -> fileReadingMessageSource.start());
}
@Bean
@ConditionalOnExpression("environment['file.consumer.mode'] != 'ref'")
public Publisher<Message<Object>> fileReadingFlow(Flux<Message<?>> fileMessageFlux) {
IntegrationFlowBuilder flowBuilder = IntegrationFlow.from(fileMessageFlux);
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher();
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher(true);
}
@Bean