GH-9869: Introduce AbstractRecentFileListFilter strategy

Fixes: https://github.com/spring-projects/spring-integration/issues/9869

Currently, FileListFilters on the last modified attribute of a file are limited to the use case of discarding files that are too young.
It would be great to have a filter implementation which would accept files which are not old enough.

* Implement `AbstractRecentFileListFilter` for all the supported file protocols
This commit is contained in:
Artem Bilan
2025-02-27 10:59:20 -05:00
parent 7680e02cbb
commit 8bfb5d248f
14 changed files with 517 additions and 2 deletions

View File

@@ -0,0 +1,81 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.file.filters;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* The {@link FileListFilter} to accept only files which are recent according to provided {@code age}:
* the {@code lastModified} of the file is more than the age in comparison with the current time.
* In other words, accept those files which are not old enough yet.
*
* @param <F> The type that will be filtered.
*
* @author Artem Bilan
*
* @since 6.5
*/
public abstract class AbstractRecentFileListFilter<F> implements FileListFilter<F> {
protected static final long ONE_SECOND = 1000;
private final Duration age;
/**
* Construct an instance with default age as 1 day.
*/
public AbstractRecentFileListFilter() {
this(Duration.ofDays(1));
}
public AbstractRecentFileListFilter(Duration age) {
this.age = age;
}
@Override
public boolean supportsSingleFileFiltering() {
return true;
}
@Override
public List<F> filterFiles(F[] files) {
List<F> list = new ArrayList<>();
Instant now = Instant.now();
for (F file : files) {
if (!fileIsAged(file, now)) {
list.add(file);
}
}
return list;
}
@Override
public boolean accept(F file) {
return !fileIsAged(file, Instant.now());
}
protected boolean fileIsAged(F file, Instant now) {
return getLastModified(file).plus(this.age).isBefore(now);
}
protected abstract Instant getLastModified(F remoteFile);
}

View File

@@ -0,0 +1,44 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.file.filters;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
/**
* The {@link AbstractRecentFileListFilter} implementation for local file system.
*
* @author Artem Bilan
*
* @since 6.5
*/
public class RecentFileListFilter extends AbstractRecentFileListFilter<File> {
public RecentFileListFilter() {
}
public RecentFileListFilter(Duration age) {
super(age);
}
@Override
protected Instant getLastModified(File file) {
return Instant.ofEpochSecond(file.lastModified() / ONE_SECOND);
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.file.filters;
import java.io.File;
import java.io.FileOutputStream;
import java.time.Duration;
import java.time.Instant;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Artem Bilan
*
* @since 6.5
*
*/
public class RecentFileListFilterTests {
@TempDir
public File folder;
@Test
public void testAge() throws Exception {
RecentFileListFilter filter = new RecentFileListFilter(Duration.ofHours(20));
File testFile = new File(folder, "test.tmp");
FileOutputStream fileOutputStream = new FileOutputStream(testFile);
fileOutputStream.write("x".getBytes());
fileOutputStream.close();
assertThat(filter.filterFiles(new File[] {testFile})).hasSize(1);
assertThat(filter.accept(testFile)).isTrue();
testFile.setLastModified(Instant.now().minus(Duration.ofDays(1)).toEpochMilli());
assertThat(filter.filterFiles(new File[] {testFile})).hasSize(0);
assertThat(filter.accept(testFile)).isFalse();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.ftp.filters;
import java.time.Duration;
import java.time.Instant;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.integration.file.filters.AbstractRecentFileListFilter;
/**
* The {@link AbstractRecentFileListFilter} implementation for FTP protocol.
*
* @author Artem Bilan
*
* @since 6.5
*/
public class FtpRecentFileListFilter extends AbstractRecentFileListFilter<FTPFile> {
public FtpRecentFileListFilter() {
super();
}
public FtpRecentFileListFilter(Duration age) {
super(age);
}
@Override
protected Instant getLastModified(FTPFile remoteFile) {
return remoteFile.getTimestampInstant();
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.ftp.filters;
import java.time.Duration;
import java.util.Calendar;
import org.apache.commons.net.ftp.FTPFile;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Artem Bilan
*
* @since 6.5
*/
public class FtpRecentFileListFilterTests {
@Test
public void testAge() {
FtpRecentFileListFilter filter = new FtpRecentFileListFilter(Duration.ofHours(20));
FTPFile ftpFile1 = new FTPFile();
ftpFile1.setName("foo");
ftpFile1.setTimestamp(Calendar.getInstance());
FTPFile ftpFile2 = new FTPFile();
ftpFile2.setName("bar");
ftpFile2.setTimestamp(Calendar.getInstance());
FTPFile[] files = new FTPFile[] {ftpFile1, ftpFile2};
assertThat(filter.filterFiles(files)).hasSize(2);
assertThat(filter.accept(ftpFile1)).isTrue();
assertThat(filter.accept(ftpFile2)).isTrue();
// Make a file as of yesterday's
final Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -1);
ftpFile2.setTimestamp(calendar);
assertThat(filter.filterFiles(files)).hasSize(1);
assertThat(filter.accept(ftpFile1)).isTrue();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.sftp.filters;
import java.time.Duration;
import java.time.Instant;
import org.apache.sshd.sftp.client.SftpClient;
import org.springframework.integration.file.filters.AbstractRecentFileListFilter;
/**
* The {@link AbstractRecentFileListFilter} implementation for SFTP protocol.
*
* @author Artem Bilan
*
* @since 6.5
*/
public class SftpRecentFileListFilter extends AbstractRecentFileListFilter<SftpClient.DirEntry> {
public SftpRecentFileListFilter() {
super();
}
public SftpRecentFileListFilter(Duration age) {
super(age);
}
@Override
protected Instant getLastModified(SftpClient.DirEntry remoteFile) {
return remoteFile.getAttributes().getModifyTime().toInstant();
}
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.sftp.filters;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.time.Instant;
import org.apache.sshd.sftp.client.SftpClient;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Artem Bilan
*
* @since 6.5
*/
public class SftpRecentFileListFilterTests {
@Test
public void testAge() {
SftpRecentFileListFilter filter = new SftpRecentFileListFilter(Duration.ofHours(20));
SftpClient.Attributes attributes1 = new SftpClient.Attributes();
attributes1.setModifyTime(FileTime.from(Instant.now()));
SftpClient.Attributes attributes2 = new SftpClient.Attributes();
attributes2.setModifyTime(FileTime.from(Instant.now()));
SftpClient.DirEntry sftpFile1 = new SftpClient.DirEntry("foo", "foo", attributes1);
SftpClient.DirEntry sftpFile2 = new SftpClient.DirEntry("bar", "bar", attributes2);
SftpClient.DirEntry[] files = new SftpClient.DirEntry[] {sftpFile1, sftpFile2};
assertThat(filter.filterFiles(files)).hasSize(2);
assertThat(filter.accept(sftpFile1)).isTrue();
assertThat(filter.accept(sftpFile2)).isTrue();
FileTime fileTime = FileTime.from(Instant.now().minus(Duration.ofDays(1)));
sftpFile2.getAttributes().setModifyTime(fileTime);
assertThat(filter.filterFiles(files)).hasSize(1);
assertThat(filter.accept(sftpFile1)).isTrue();
assertThat(filter.accept(sftpFile2)).isFalse();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.smb.filters;
import java.time.Duration;
import java.time.Instant;
import jcifs.smb.SmbFile;
import org.springframework.integration.file.filters.AbstractRecentFileListFilter;
/**
* The {@link AbstractRecentFileListFilter} implementation for SMB protocol.
*
* @author Artem Bilan
*
* @since 6.2
*/
public class SmbRecentFileListFilter extends AbstractRecentFileListFilter<SmbFile> {
public SmbRecentFileListFilter() {
super();
}
public SmbRecentFileListFilter(Duration age) {
super(age);
}
@Override
protected Instant getLastModified(SmbFile remoteFile) {
return Instant.ofEpochSecond(remoteFile.getLastModified() / ONE_SECOND);
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.smb.filters;
import java.time.Duration;
import java.time.Instant;
import jcifs.smb.SmbFile;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author Artem Bilan
*
* @since 6.5
*/
public class SmbRecentFileListFilterTests {
@Test
public void testAge() {
SmbRecentFileListFilter filter = new SmbRecentFileListFilter(Duration.ofHours(20));
SmbFile smbFile1 = mock();
when(smbFile1.getLastModified()).thenReturn(System.currentTimeMillis());
SmbFile smbFile2 = mock();
when(smbFile2.getLastModified()).thenReturn(System.currentTimeMillis());
SmbFile smbFile3 = mock();
when(smbFile3.getLastModified())
.thenReturn(Instant.now().minus(Duration.ofDays(1)).toEpochMilli());
SmbFile[] files = new SmbFile[] {smbFile1, smbFile2, smbFile3};
assertThat(filter.filterFiles(files)).hasSize(2);
assertThat(filter.accept(smbFile1)).isTrue();
assertThat(filter.accept(smbFile2)).isTrue();
assertThat(filter.accept(smbFile3)).isFalse();
}
}

View File

@@ -138,7 +138,12 @@ The `CompositeFileListFilter` also implements a `DiscardAwareFileListFilter` and
NOTE: Since `CompositeFileListFilter` matches the files against all delegates, the `discardCallback` may be called several times for the same file.
Starting with version 5.1, the `FileReadingMessageSource` doesn't check a directory for existence and doesn't create it until its `start()` is called (typically via wrapping `SourcePollingChannelAdapter`).
Previously, there was no simple way to prevent an operation system permissions error when referencing the directory, for example from tests, or when permissions are applied later.
Previously, there was no simple way to prevent the operating system permissions error when referencing the directory, for example from tests, or when permissions are applied later.
In opposition to the `LastModifiedFileListFilter`, a `RecentFileListFilter` strategy has been introduced starting version 6.5.
It is an extension for local file system of the `AbstractRecentFileListFilter`.
By default, it accepts files which are not older than 1 day.
See its other implementations for respective remote file protocol.
[[message-headers]]
== Message Headers

View File

@@ -103,6 +103,8 @@ This filter can be configured with an `age` property so that only files older th
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches).
Look into its Javadoc for more information.
In contrast, starting with version 6.5, an `FtpRecentFileListFilter` has been introduced to accept only those files which are not older than provided `age`.
[[more-on-file-filtering-and-incomplete-files]]
== More on File Filtering and Incomplete Files

View File

@@ -100,6 +100,8 @@ This filter can be configured with an `age` property so that only files older th
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches).
Look into its Javadoc for more information.
In contrast, starting with version 6.5, an `SftpRecentFileListFilter` has been introduced to accept only those files which are not older than provided `age`.
[[more-on-file-filtering-and-large-files]]
== More on File Filtering and Large Files

View File

@@ -162,6 +162,8 @@ This filter can be configured with an `age` property so that only files older th
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches.
Look into its Javadoc for more information.
In contrast, starting with version 6.5, an `SmbRecentFileListFilter` has been introduced to accept only those files which are not older than provided `age`.
[[configuring-with-the-java-dsl]]
=== Configuring with the Java DSL

View File

@@ -61,4 +61,11 @@ See xref:mqtt.adoc[MQTT Support] for more information.
The `KafkaMessageSource` and `KafkaMessageDrivenChannelAdapter` now generate `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` headers by default as the rest of Spring Integration channel adapters.
The behavior can be restored to the previous with injection of the `MessagingMessageConverter` with default settings.
See xref:kafka.adoc[Apache Kafka Support] for more information.
See xref:kafka.adoc[Apache Kafka Support] for more information.
[[x6.5-file-filter-changes]]
== The Recent File Filter Support
The `AbstractRecentFileListFilter` strategy has been introduced to accept only those files which are not old enough according to the provided `age`.
The respective implementations are provided: `RecentFileListFilter`, `FtpRecentFileListFilter`, `SftpRecentFileListFilter` and `SmbRecentFileListFilter`.
See xref:file/reading.adoc[Reading Files] for more information.