GH-146: SFTP supplier: Use proper FileListFilter
Fixes https://github.com/spring-cloud/stream-applications/issues/146 The `SftpRegexPatternFileListFilter` is used for the `filenamePattern` option, too. It seems like a copy/paste artefact * Change `filenamePattern`-based configuration for the `SftpSimplePatternFileListFilter` * Fix `SftpSupplierApplicationTests` for multi-OS compatibility
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2020 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -60,6 +60,7 @@ import org.springframework.integration.sftp.dsl.Sftp;
|
||||
import org.springframework.integration.sftp.dsl.SftpInboundChannelAdapterSpec;
|
||||
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
|
||||
import org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter;
|
||||
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
|
||||
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
|
||||
import org.springframework.integration.util.IntegrationReactiveUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
@@ -83,7 +84,6 @@ import org.springframework.util.StringUtils;
|
||||
@Configuration
|
||||
@EnableConfigurationProperties({SftpSupplierProperties.class, FileConsumerProperties.class})
|
||||
@Import({SftpSupplierFactoryConfiguration.class})
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
public class SftpSupplierConfiguration {
|
||||
|
||||
private static final String METADATA_STORE_PREFIX = "sftpSource/";
|
||||
@@ -135,15 +135,14 @@ public class SftpSupplierConfiguration {
|
||||
@Bean
|
||||
public FileListFilter<LsEntry> chainFilter(SftpSupplierProperties sftpSupplierProperties,
|
||||
ConcurrentMetadataStore metadataStore) {
|
||||
|
||||
ChainFileListFilter<LsEntry> chainFilter = new ChainFileListFilter<>();
|
||||
|
||||
if (StringUtils.hasText(sftpSupplierProperties.getFilenamePattern())) {
|
||||
chainFilter
|
||||
.addFilter(new SftpRegexPatternFileListFilter(sftpSupplierProperties.getFilenamePattern()));
|
||||
chainFilter.addFilter(new SftpSimplePatternFileListFilter(sftpSupplierProperties.getFilenamePattern()));
|
||||
}
|
||||
else if (sftpSupplierProperties.getFilenameRegex() != null) {
|
||||
chainFilter
|
||||
.addFilter(new SftpRegexPatternFileListFilter(sftpSupplierProperties.getFilenameRegex()));
|
||||
chainFilter.addFilter(new SftpRegexPatternFileListFilter(sftpSupplierProperties.getFilenameRegex()));
|
||||
}
|
||||
|
||||
chainFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(metadataStore, METADATA_STORE_PREFIX));
|
||||
@@ -213,6 +212,7 @@ public class SftpSupplierConfiguration {
|
||||
@ConditionalOnProperty(prefix = "sftp.supplier", value = "delete-remote-files")
|
||||
public RemoteFileDeletingAdvice remoteFileDeletingAdvice(SftpRemoteFileTemplate sftpTemplate,
|
||||
SftpSupplierProperties sftpSupplierProperties) {
|
||||
|
||||
return new RemoteFileDeletingAdvice(sftpTemplate, sftpSupplierProperties.getRemoteFileSeparator());
|
||||
}
|
||||
|
||||
@@ -280,6 +280,7 @@ public class SftpSupplierConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@SuppressWarnings("unchecked")
|
||||
public MessageSource<?> targetMessageSource(PollableChannel listingChannel,
|
||||
SftpListingMessageProducer sftpListingMessageProducer) {
|
||||
return () -> {
|
||||
@@ -310,9 +311,7 @@ public class SftpSupplierConfiguration {
|
||||
predicate = sftpSupplierProperties.getFilenameRegex().asPredicate();
|
||||
}
|
||||
|
||||
GenericSelector<String> selector = predicate::test;
|
||||
|
||||
return selector;
|
||||
return predicate::test;
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
* Copyright 2020-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,6 +16,10 @@
|
||||
|
||||
package org.springframework.cloud.fn.supplier.sftp;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -31,8 +35,6 @@ import java.util.function.Supplier;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
@@ -44,9 +46,8 @@ import org.springframework.integration.metadata.MetadataStore;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
public class SftpSupplierApplicationTests extends SftpTestSupport {
|
||||
|
||||
@@ -109,11 +110,9 @@ public class SftpSupplierApplicationTests extends SftpTestSupport {
|
||||
SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class);
|
||||
|
||||
final AtomicReference<String> expectedFileName = new AtomicReference<>(
|
||||
properties.getRemoteDir() + File.separator + "sftpSource1.txt");
|
||||
properties.getRemoteDir() + "/sftpSource1.txt");
|
||||
StepVerifier.create(sftpSupplier.get())
|
||||
.assertNext(message -> {
|
||||
assertThat(expectedFileName.get()).contains(message.getPayload());
|
||||
})
|
||||
.assertNext(message -> assertThat(expectedFileName.get()).contains(message.getPayload()))
|
||||
.expectTimeout(Duration.ofMillis(1000))
|
||||
.verify(Duration.ofSeconds(30));
|
||||
|
||||
@@ -403,4 +402,5 @@ public class SftpSupplierApplicationTests extends SftpTestSupport {
|
||||
static class SftpSupplierTestApplication {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user