From f64f77c4acd2a67f539723ad41869e8dd712feea Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 16 Jul 2024 11:28:29 -0400 Subject: [PATCH] GH-244: Add `UserRecordResponse` adapter for `UserRecordResult` Fixes: #244 The `UserRecordResult` has an `attempts()` property which might be used post-put request logic in the output channel. * Introduce a `KinesisResponse` extension to adapt a `UserRecordResult`. This way the further flow logic may consult low-level result from KPL via existing `AwsHeaders.SERVICE_RESULT` header in the reply message --- .../aws/outbound/KplMessageHandler.java | 14 ++-- .../aws/support/UserRecordResponse.java | 72 +++++++++++++++++++ 2 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/springframework/integration/aws/support/UserRecordResponse.java diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 4fab525..1b2fe6b 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ import software.amazon.awssdk.awscore.AwsResponse; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; -import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; @@ -50,6 +49,7 @@ import org.springframework.core.serializer.support.SerializingConverter; import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.aws.support.AwsHeaders; +import org.springframework.integration.aws.support.UserRecordResponse; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.mapping.HeaderMapper; @@ -305,7 +305,7 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement @Override protected Map additionalOnSuccessHeaders(AwsRequest request, AwsResponse response) { - if (response instanceof PutRecordResponse putRecordResponse) { + if (response instanceof UserRecordResponse putRecordResponse) { return Map.of(AwsHeaders.SHARD, putRecordResponse.shardId(), AwsHeaders.SEQUENCE_NUMBER, putRecordResponse.sequenceNumber()); } @@ -367,14 +367,10 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement } } - private CompletableFuture handleUserRecord(UserRecord userRecord) { + private CompletableFuture handleUserRecord(UserRecord userRecord) { ListenableFuture recordResult = this.kinesisProducer.addUserRecord(userRecord); return listenableFutureToCompletableFuture(recordResult) - .thenApply(result -> - PutRecordResponse.builder() - .shardId(result.getShardId()) - .sequenceNumber(result.getSequenceNumber()) - .build()); + .thenApply(UserRecordResponse::new); } private PutRecordRequest buildPutRecordRequest(Message message) { diff --git a/src/main/java/org/springframework/integration/aws/support/UserRecordResponse.java b/src/main/java/org/springframework/integration/aws/support/UserRecordResponse.java new file mode 100644 index 0000000..d0312a2 --- /dev/null +++ b/src/main/java/org/springframework/integration/aws/support/UserRecordResponse.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.aws.support; + +import java.util.List; + +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import software.amazon.awssdk.awscore.AwsResponse; +import software.amazon.awssdk.core.SdkField; +import software.amazon.awssdk.services.kinesis.model.KinesisResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; + +/** + * The {@link KinesisResponse} adapter for the KPL {@link UserRecordResult} response. + * + * @author Artem Bilan + * + * @since 3.0.8 + */ +public class UserRecordResponse extends KinesisResponse { + + private final String shardId; + + private final String sequenceNumber; + + private final List attempts; + + public UserRecordResponse(UserRecordResult userRecordResult) { + super(PutRecordResponse.builder()); + this.shardId = userRecordResult.getShardId(); + this.sequenceNumber = userRecordResult.getSequenceNumber(); + this.attempts = userRecordResult.getAttempts(); + } + + public String shardId() { + return this.shardId; + } + + public String sequenceNumber() { + return this.sequenceNumber; + } + + public List attempts() { + return this.attempts; + } + + @Override + public AwsResponse.Builder toBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public List> sdkFields() { + throw new UnsupportedOperationException(); + } + +}