GH-120: Fix auto-wiring ambiguity in the AwsS3SupplierConfiguration
Fixes: https://github.com/spring-cloud/spring-functions-catalog/issues/120
This commit is contained in:
@@ -20,7 +20,6 @@ import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@@ -96,7 +95,7 @@ public class AwsS3SupplierConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
ChainFileListFilter<S3Object> filter(ConcurrentMetadataStore metadataStore) {
|
||||
ChainFileListFilter<S3Object> s3SupplierFileListFilter(ConcurrentMetadataStore metadataStore) {
|
||||
ChainFileListFilter<S3Object> chainFilter = new ChainFileListFilter<>();
|
||||
if (StringUtils.hasText(this.awsS3SupplierProperties.getFilenamePattern())) {
|
||||
chainFilter
|
||||
@@ -129,8 +128,7 @@ public class AwsS3SupplierConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
S3InboundFileSynchronizer s3InboundFileSynchronizer(ChainFileListFilter<S3Object> filter) {
|
||||
|
||||
S3InboundFileSynchronizer s3InboundFileSynchronizer(ChainFileListFilter<S3Object> s3SupplierFileListFilter) {
|
||||
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.s3SessionFactory);
|
||||
synchronizer.setDeleteRemoteFiles(this.awsS3SupplierProperties.isDeleteRemoteFiles());
|
||||
synchronizer.setPreserveTimestamp(this.awsS3SupplierProperties.isPreserveTimestamp());
|
||||
@@ -138,8 +136,7 @@ public class AwsS3SupplierConfiguration {
|
||||
synchronizer.setRemoteDirectory(remoteDir);
|
||||
synchronizer.setRemoteFileSeparator(this.awsS3SupplierProperties.getRemoteFileSeparator());
|
||||
synchronizer.setTemporaryFileSuffix(this.awsS3SupplierProperties.getTmpFileSuffix());
|
||||
synchronizer.setFilter(filter);
|
||||
|
||||
synchronizer.setFilter(s3SupplierFileListFilter);
|
||||
return synchronizer;
|
||||
}
|
||||
|
||||
@@ -178,8 +175,8 @@ public class AwsS3SupplierConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
Publisher<Message<Object>> s3SupplierFlow(ReactiveMessageSourceProducer s3ListingProducer) {
|
||||
return IntegrationFlow.from(s3ListingProducer).split().toReactivePublisher(true);
|
||||
Publisher<Message<Object>> s3SupplierFlow(ReactiveMessageSourceProducer s3ListingMessageProducer) {
|
||||
return IntegrationFlow.from(s3ListingMessageProducer).split().toReactivePublisher(true);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -210,13 +207,13 @@ public class AwsS3SupplierConfiguration {
|
||||
|
||||
@Bean
|
||||
ReactiveMessageSourceProducer s3ListingMessageProducer(S3Client amazonS3, ObjectMapper objectMapper,
|
||||
AwsS3SupplierProperties awsS3SupplierProperties, Predicate<S3Object> filter) {
|
||||
AwsS3SupplierProperties awsS3SupplierProperties, Predicate<S3Object> listOnlyFilter) {
|
||||
return new ReactiveMessageSourceProducer((MessageSource<List<String>>) () -> {
|
||||
List<String> summaryList = amazonS3
|
||||
.listObjects(ListObjectsRequest.builder().bucket(awsS3SupplierProperties.getRemoteDir()).build())
|
||||
.contents()
|
||||
.stream()
|
||||
.filter(filter)
|
||||
.filter(listOnlyFilter)
|
||||
.map((s3Object) -> {
|
||||
try {
|
||||
return objectMapper.writeValueAsString(s3Object.toBuilder());
|
||||
@@ -225,7 +222,7 @@ public class AwsS3SupplierConfiguration {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
return summaryList.isEmpty() ? null : new GenericMessage<>(summaryList);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user