Introduce basic support for user-configurable sorting of the remote file list.

This commit is contained in:
Andrea Montemaggio
2021-01-12 16:51:10 -06:00
committed by David Turanski
parent 99bc311ec0
commit 65f3b82d0b
7 changed files with 189 additions and 15 deletions

View File

@@ -157,6 +157,11 @@ $$password$$:: $$The password to use to connect to the server.$$ *($$String$$, d
$$port$$:: $$The port of the server.$$ *($$Integer$$, default: `$$22$$`)*
$$private-key$$:: $$Resource location of user's private key.$$ *($$Resource$$, default: `$$<none>$$`)*
$$username$$:: $$The username to use to connect to the server.$$ *($$String$$, default: `$$<none>$$`)*
=== sftp.supplier.sort-by
$$attribute$$:: $$Attribute of the file listing entry to sort by (FILENAME, ATIME: last access time, MTIME: last modified time).$$ *($$Attribute$$, default: `$$<none>$$`)*
$$dir$$:: $$Sorting direction (ASC or DESC).$$ *($$Dir$$, default: `$$<none>$$`)*
//end::configuration-properties[]
== Examples

View File

@@ -1,5 +1,6 @@
configuration-properties.classes=org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties, \
org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties$Factory,\
org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties$Factory, \
org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties$SortSpec, \
org.springframework.cloud.fn.common.file.FileConsumerProperties, \
org.springframework.cloud.fn.common.metadata.store.MetadataStoreProperties, \
org.springframework.cloud.fn.common.metadata.store.MetadataStoreProperties$Gemfire, \

View File

@@ -1,5 +1,6 @@
configuration-properties.classes=org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties, \
org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties$Factory,\
org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties$Factory, \
org.springframework.cloud.fn.supplier.sftp.SftpSupplierProperties$SortSpec, \
org.springframework.cloud.fn.common.file.FileConsumerProperties, \
org.springframework.cloud.fn.common.metadata.store.MetadataStoreProperties, \
org.springframework.cloud.fn.common.metadata.store.MetadataStoreProperties$Gemfire, \
@@ -8,3 +9,4 @@ configuration-properties.classes=org.springframework.cloud.fn.supplier.sftp.Sftp
org.springframework.cloud.fn.common.metadata.store.MetadataStoreProperties$Jdbc, \
org.springframework.cloud.fn.common.metadata.store.MetadataStoreProperties$Zookeeper, \
org.springframework.cloud.fn.common.metadata.store.MetadataStoreProperties$Mongo \

View File

@@ -18,7 +18,7 @@
<properties>
<revision>3.0.2-SNAPSHOT</revision>
<stream-apps-core.version>${revision}</stream-apps-core.version>
<java-functions.version>1.0.0</java-functions.version>
<java-functions.version>1.0.1-SNAPSHOT</java-functions.version>
<apps.base-image>springcloud/baseimage:1.0.0</apps.base-image>
<mockserver.version>5.10</mockserver.version>
<spring-cloud-stream-dependencies.version>Horsham.SR10</spring-cloud-stream-dependencies.version>

View File

@@ -81,8 +81,8 @@ import org.springframework.util.StringUtils;
*/
@Configuration
@EnableConfigurationProperties({ SftpSupplierProperties.class, FileConsumerProperties.class })
@Import({ SftpSupplierFactoryConfiguration.class })
@EnableConfigurationProperties({SftpSupplierProperties.class, FileConsumerProperties.class})
@Import({SftpSupplierFactoryConfiguration.class})
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public class SftpSupplierConfiguration {
@@ -236,7 +236,7 @@ public class SftpSupplierConfiguration {
FileConsumerProperties fileConsumerProperties) {
return FileUtils.enhanceFlowForReadingMode(IntegrationFlows
.from(IntegrationReactiveUtils.messageSourceToFlux(sftpMessageSource)),
.from(IntegrationReactiveUtils.messageSourceToFlux(sftpMessageSource)),
fileConsumerProperties)
.toReactivePublisher();
}
@@ -281,7 +281,7 @@ public class SftpSupplierConfiguration {
@Bean
public MessageSource<?> targetMessageSource(PollableChannel listingChannel,
SftpListingMessageProducer sftpListingMessageProducer) {
SftpListingMessageProducer sftpListingMessageProducer) {
return () -> {
sftpListingMessageProducer.listNames();
return (Message<Object>) listingChannel.receive();
@@ -295,7 +295,9 @@ public class SftpSupplierConfiguration {
return new SftpListingMessageProducer(delegatingFactoryWrapper.getFactory(),
remoteDirectory(sftpSupplierProperties),
sftpSupplierProperties.getRemoteFileSeparator());
sftpSupplierProperties.getRemoteFileSeparator(),
sftpSupplierProperties.getSortBy()
);
}
@Bean
@@ -374,23 +376,29 @@ public class SftpSupplierConfiguration {
private final String remoteFileSeparator;
private final SftpSupplierProperties.SortSpec sort;
SftpListingMessageProducer(SessionFactory<?> sessionFactory, String remoteDirectory,
String remoteFileSeparator) {
String remoteFileSeparator, SftpSupplierProperties.SortSpec sort) {
this.sessionFactory = sessionFactory;
this.remoteDirectory = remoteDirectory;
this.remoteFileSeparator = remoteFileSeparator;
this.sort = sort;
}
public void listNames() {
LsEntry[] entries = {};
try {
entries = Stream.of(this.sessionFactory.getSession().list(this.remoteDirectory))
.filter(o -> {
LsEntry lsEntry = (LsEntry) o;
return !(lsEntry.getAttrs().isDir() || lsEntry.getAttrs().isLink());
})
.collect(Collectors.toList()).toArray(entries);
Stream<LsEntry> stream = Stream.of(this.sessionFactory.getSession().list(this.remoteDirectory))
.map(x -> (LsEntry) x)
.filter(x -> !(x.getAttrs().isDir() || x.getAttrs().isLink()));
if (sort != null) {
stream = stream.sorted(sort.comparator());
}
entries = stream.collect(Collectors.toList()).toArray(entries);
}
catch (IOException e) {
throw new MessagingException(e.getMessage(), e);

View File

@@ -20,14 +20,17 @@ import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.validation.Valid;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import com.jcraft.jsch.ChannelSftp;
import org.hibernate.validator.constraints.Range;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -135,6 +138,12 @@ public class SftpSupplierProperties {
*/
private String[] directories;
/**
* Sorting specification for remote files listings. If null, order of entries is undefined.
* Otherwise, entries are sorted by the specified field and direction, according to the type canonical ordering.
*/
private SortSpec sortBy;
@NotBlank
public String getRemoteDir() {
return remoteDir;
@@ -291,6 +300,15 @@ public class SftpSupplierProperties {
return keyDirs;
}
@Valid
public SftpSupplierProperties.SortSpec getSortBy() {
return sortBy;
}
public void setSortBy(SortSpec sortBy) {
this.sortBy = sortBy;
}
public static class Factory {
/**
@@ -403,4 +421,80 @@ public class SftpSupplierProperties {
}
public static class SortSpec {
/**
* Attribute of the file listing entry to sort by (FILENAME, ATIME: last access time, MTIME: last modified time).
*/
private Attribute attribute;
/**
* Sorting direction (ASC or DESC).
*/
private Dir dir = Dir.ASC;
@NotNull
public Attribute getAttribute() {
return attribute;
}
public void setAttribute(Attribute attribute) {
this.attribute = attribute;
}
@NotNull
public Dir getDir() {
return dir;
}
public void setDir(Dir dir) {
this.dir = dir;
}
public enum Attribute {
/**
* Filename attribute.
*/
FILENAME,
/**
* Last access time attribute.
*/
ATIME,
/**
* Last modified time attribute.
*/
MTIME
}
public enum Dir {
/**
* Ascending sort direction.
*/
ASC,
/**
* Descending sort direction.
*/
DESC
}
private Comparator<ChannelSftp.LsEntry> getAttributeComparator() {
switch (attribute) {
case FILENAME:
return Comparator.comparing(ChannelSftp.LsEntry::getFilename);
case ATIME:
return Comparator.comparing(x -> x.getAttrs().getATime());
case MTIME:
return Comparator.comparing(x -> x.getAttrs().getMTime());
}
throw new UnsupportedOperationException("Unsupported sortBy attribute: " + attribute);
}
public Comparator<ChannelSftp.LsEntry> comparator() {
Comparator<ChannelSftp.LsEntry> comparator = getAttributeComparator();
return dir == Dir.ASC ? comparator : comparator.reversed();
}
}
}

View File

@@ -22,7 +22,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -118,6 +120,68 @@ public class SftpSupplierApplicationTests extends SftpTestSupport {
});
}
@Test
void supplierForListSortedByFilenameAsc() {
defaultApplicationContextRunner
.withPropertyValues("sftp.supplier.listOnly=true", "sftp.supplier.sortBy.attribute=filename", "sftp.supplier.sortBy.dir=asc")
.run(context -> {
Supplier<Flux<Message<String>>> sftpSupplier = context.getBean("sftpSupplier",
Supplier.class);
SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class);
List<String> fileNames = new ArrayList<>();
fileNames.add(String.join(properties.getRemoteFileSeparator(), properties.getRemoteDir(),
"sftpSource1.txt"));
fileNames.add(String.join(properties.getRemoteFileSeparator(), properties.getRemoteDir(),
"sftpSource2.txt"));
final AtomicReference<List<String>> expectedFileNames = new AtomicReference<>(fileNames);
StepVerifier.create(sftpSupplier.get())
.assertNext(message -> {
assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(0));
assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MediaType.TEXT_PLAIN);
})
.assertNext(message -> {
assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(1));
assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MediaType.TEXT_PLAIN);
})
.expectTimeout(Duration.ofMillis(1000))
.verify(Duration.ofSeconds(30));
});
}
@Test
void supplierForListSortedByFilenameDesc() {
defaultApplicationContextRunner
.withPropertyValues("sftp.supplier.listOnly=true", "sftp.supplier.sortBy.attribute=filename", "sftp.supplier.sortBy.dir=desc")
.run(context -> {
Supplier<Flux<Message<String>>> sftpSupplier = context.getBean("sftpSupplier",
Supplier.class);
SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class);
List<String> fileNames = new ArrayList<>();
fileNames.add(String.join(properties.getRemoteFileSeparator(), properties.getRemoteDir(),
"sftpSource2.txt"));
fileNames.add(String.join(properties.getRemoteFileSeparator(), properties.getRemoteDir(),
"sftpSource1.txt"));
final AtomicReference<List<String>> expectedFileNames = new AtomicReference<>(fileNames);
StepVerifier.create(sftpSupplier.get())
.assertNext(message -> {
assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(0));
assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MediaType.TEXT_PLAIN);
})
.assertNext(message -> {
assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(1));
assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MediaType.TEXT_PLAIN);
})
.expectTimeout(Duration.ofMillis(1000))
.verify(Duration.ofSeconds(30));
});
}
@Test
void supplierForFileRef() {
defaultApplicationContextRunner