GH-8967: maxFetchSize = 1 from the StandardRotationPolicy

Fixes: #8967

In the `fair` mode the `StandardRotationPolicy` re-configures an `AbstractFetchLimitingMessageSource`
for a new directory (and possible new `ConnectionFactory`) in the `beforeReceive()`.
However, with default `maxFetchSize` (or bigger than `1`), the `receive()`` would poll `toBeReceived` internal queue
 for files cached from the previous polling cycle.
Since we rotate the source immediately to a new set of options, all those cached files don't make sense
or even can cause the problem on fetching their content in case of `AbstractRemoteFileStreamingMessageSource`
when we rotate to a new `ConnectionFactory`.

* Call `fetchLimitingMessageSource.setMaxFetchSize(1);` in the `StandardRotationPolicy.beforeReceive()`
when `fair && !this.initialized`

(cherry picked from commit c78630e6a9)
This commit is contained in:
Artem Bilan
2024-02-26 16:59:13 -05:00
parent 768915b994
commit 2d048474c0
2 changed files with 17 additions and 10 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.log.LogMessage;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource;
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
@@ -33,9 +35,9 @@ import org.springframework.util.Assert;
/**
* Standard rotation policy; iterates over key/directory pairs; when the end is reached,
* starts again at the beginning. If the fair option is true the rotation occurs on every
* poll, regardless of result. Otherwise rotation occurs when the current pair returns no
* poll, regardless of result. Otherwise, rotation occurs when the current pair returns no
* message.
*
* <p>
* Subclasses implement {@code onRotation(MessageSource<?> source)} to configure the
* {@link MessageSource} on each rotation.
*
@@ -78,6 +80,12 @@ public class StandardRotationPolicy implements RotationPolicy {
public void beforeReceive(MessageSource<?> source) {
if (this.fair || !this.initialized) {
configureSource(source);
if (this.fair && !this.initialized
&& source instanceof AbstractFetchLimitingMessageSource<?> fetchLimitingMessageSource) {
this.logger.info(LogMessage.format("Enforce 'maxFetchSize = 1' for '%s' in the 'fair' mode", source));
fetchLimitingMessageSource.setMaxFetchSize(1);
}
this.initialized = true;
}
if (this.logger.isTraceEnabled()) {
@@ -142,11 +150,11 @@ public class StandardRotationPolicy implements RotationPolicy {
* @param source the MessageSource.
*/
protected void onRotation(MessageSource<?> source) {
if (source instanceof AbstractRemoteFileStreamingMessageSource) {
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.current.getDirectory());
if (source instanceof AbstractRemoteFileStreamingMessageSource<?> streamingMessageSource) {
streamingMessageSource.setRemoteDirectory(this.current.getDirectory());
}
else if (source instanceof AbstractInboundFileSynchronizingMessageSource) {
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
else if (source instanceof AbstractInboundFileSynchronizingMessageSource<?> synchronizingMessageSource) {
synchronizingMessageSource.getSynchronizer()
.setRemoteDirectory(this.current.getDirectory());
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -405,8 +405,7 @@ public class RotatingServersTests extends FtpTestSupport {
public IntegrationFlow flow() {
return IntegrationFlow.from(Ftp.inboundStreamingAdapter(new FtpRemoteFileTemplate(sf()))
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.remoteDirectory(".")
.maxFetchSize(1),
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();