diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java index de4c9fdde7..6f584a81ee 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,9 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.core.log.LogMessage; import org.springframework.integration.core.MessageSource; +import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource; import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource; @@ -33,9 +35,9 @@ import org.springframework.util.Assert; /** * Standard rotation policy; iterates over key/directory pairs; when the end is reached, * starts again at the beginning. If the fair option is true the rotation occurs on every - * poll, regardless of result. Otherwise rotation occurs when the current pair returns no + * poll, regardless of result. Otherwise, rotation occurs when the current pair returns no * message. - * + *
* Subclasses implement {@code onRotation(MessageSource> source)} to configure the * {@link MessageSource} on each rotation. * @@ -78,6 +80,12 @@ public class StandardRotationPolicy implements RotationPolicy { public void beforeReceive(MessageSource> source) { if (this.fair || !this.initialized) { configureSource(source); + if (this.fair && !this.initialized + && source instanceof AbstractFetchLimitingMessageSource> fetchLimitingMessageSource) { + + this.logger.info(LogMessage.format("Enforce 'maxFetchSize = 1' for '%s' in the 'fair' mode", source)); + fetchLimitingMessageSource.setMaxFetchSize(1); + } this.initialized = true; } if (this.logger.isTraceEnabled()) { @@ -142,11 +150,11 @@ public class StandardRotationPolicy implements RotationPolicy { * @param source the MessageSource. */ protected void onRotation(MessageSource> source) { - if (source instanceof AbstractRemoteFileStreamingMessageSource) { - ((AbstractRemoteFileStreamingMessageSource>) source).setRemoteDirectory(this.current.getDirectory()); + if (source instanceof AbstractRemoteFileStreamingMessageSource> streamingMessageSource) { + streamingMessageSource.setRemoteDirectory(this.current.getDirectory()); } - else if (source instanceof AbstractInboundFileSynchronizingMessageSource) { - ((AbstractInboundFileSynchronizingMessageSource>) source).getSynchronizer() + else if (source instanceof AbstractInboundFileSynchronizingMessageSource> synchronizingMessageSource) { + synchronizingMessageSource.getSynchronizer() .setRemoteDirectory(this.current.getDirectory()); } } diff --git a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java index a85254787a..39deac9eb0 100644 --- a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java +++ b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -405,8 +405,7 @@ public class RotatingServersTests extends FtpTestSupport { public IntegrationFlow flow() { return IntegrationFlow.from(Ftp.inboundStreamingAdapter(new FtpRemoteFileTemplate(sf())) .filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate")) - .remoteDirectory(".") - .maxFetchSize(1), + .remoteDirectory("."), e -> e.poller(Pollers.fixedDelay(1).advice(advice()))) .channel(MessageChannels.queue("files")) .get();