From 2d048474c0743e91868fa8a740cfd3db89659175 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 26 Feb 2024 16:59:13 -0500 Subject: [PATCH] GH-8967: maxFetchSize = 1 from the StandardRotationPolicy Fixes: #8967 In the `fair` mode the `StandardRotationPolicy` re-configures an `AbstractFetchLimitingMessageSource` for a new directory (and possible new `ConnectionFactory`) in the `beforeReceive()`. However, with default `maxFetchSize` (or bigger than `1`), the `receive()`` would poll `toBeReceived` internal queue for files cached from the previous polling cycle. Since we rotate the source immediately to a new set of options, all those cached files don't make sense or even can cause the problem on fetching their content in case of `AbstractRemoteFileStreamingMessageSource` when we rotate to a new `ConnectionFactory`. * Call `fetchLimitingMessageSource.setMaxFetchSize(1);` in the `StandardRotationPolicy.beforeReceive()` when `fair && !this.initialized` (cherry picked from commit c78630e6a956a52106b4d352e8c075d428eb0721) --- .../remote/aop/StandardRotationPolicy.java | 22 +++++++++++++------ .../ftp/inbound/RotatingServersTests.java | 5 ++--- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java index de4c9fdde7..6f584a81ee 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,9 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.core.log.LogMessage; import org.springframework.integration.core.MessageSource; +import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource; import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource; @@ -33,9 +35,9 @@ import org.springframework.util.Assert; /** * Standard rotation policy; iterates over key/directory pairs; when the end is reached, * starts again at the beginning. If the fair option is true the rotation occurs on every - * poll, regardless of result. Otherwise rotation occurs when the current pair returns no + * poll, regardless of result. Otherwise, rotation occurs when the current pair returns no * message. - * + *

* Subclasses implement {@code onRotation(MessageSource source)} to configure the * {@link MessageSource} on each rotation. * @@ -78,6 +80,12 @@ public class StandardRotationPolicy implements RotationPolicy { public void beforeReceive(MessageSource source) { if (this.fair || !this.initialized) { configureSource(source); + if (this.fair && !this.initialized + && source instanceof AbstractFetchLimitingMessageSource fetchLimitingMessageSource) { + + this.logger.info(LogMessage.format("Enforce 'maxFetchSize = 1' for '%s' in the 'fair' mode", source)); + fetchLimitingMessageSource.setMaxFetchSize(1); + } this.initialized = true; } if (this.logger.isTraceEnabled()) { @@ -142,11 +150,11 @@ public class StandardRotationPolicy implements RotationPolicy { * @param source the MessageSource. */ protected void onRotation(MessageSource source) { - if (source instanceof AbstractRemoteFileStreamingMessageSource) { - ((AbstractRemoteFileStreamingMessageSource) source).setRemoteDirectory(this.current.getDirectory()); + if (source instanceof AbstractRemoteFileStreamingMessageSource streamingMessageSource) { + streamingMessageSource.setRemoteDirectory(this.current.getDirectory()); } - else if (source instanceof AbstractInboundFileSynchronizingMessageSource) { - ((AbstractInboundFileSynchronizingMessageSource) source).getSynchronizer() + else if (source instanceof AbstractInboundFileSynchronizingMessageSource synchronizingMessageSource) { + synchronizingMessageSource.getSynchronizer() .setRemoteDirectory(this.current.getDirectory()); } } diff --git a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java index a85254787a..39deac9eb0 100644 --- a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java +++ b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -405,8 +405,7 @@ public class RotatingServersTests extends FtpTestSupport { public IntegrationFlow flow() { return IntegrationFlow.from(Ftp.inboundStreamingAdapter(new FtpRemoteFileTemplate(sf())) .filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate")) - .remoteDirectory(".") - .maxFetchSize(1), + .remoteDirectory("."), e -> e.poller(Pollers.fixedDelay(1).advice(advice()))) .channel(MessageChannels.queue("files")) .get();