From 2e63b2c6820fbd48eaacf91dd4f1668a733c7e4e Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 22 Nov 2017 12:13:32 -0500 Subject: [PATCH] GH-62: Align Message Handlers for common API Fixes: spring-projects/spring-integration-aws#62 * Make `SnsMessageHandler extends AbstractAwsMessageHandler` * Remove SNS Outbound Gateway variant since `SnsMessageHandler` covers that part via `successChannel` and `failureChannel` * Fix XSD for SQS and SNS * Fix SQS and SNS tests according their logic changes * Fix README for new changes --- README.md | 44 +-- .../aws/config/xml/AwsNamespaceHandler.java | 4 +- .../aws/config/xml/AwsParserUtils.java | 25 +- .../xml/SnsOutboundChannelAdapterParser.java | 38 ++- .../config/xml/SnsOutboundGatewayParser.java | 73 ----- .../SqsMessageDrivenChannelAdapterParser.java | 6 +- .../xml/SqsOutboundChannelAdapterParser.java | 42 ++- .../outbound/AbstractAwsMessageHandler.java | 221 +++++++++++++++ .../aws/outbound/KinesisMessageHandler.java | 189 ++----------- .../aws/outbound/SnsMessageHandler.java | 82 +++--- .../aws/outbound/SqsMessageHandler.java | 165 ++++++++++-- .../integration/aws/support/AwsHeaders.java | 7 +- .../aws/config/spring-integration-aws-2.0.xsd | 252 ++++++++++++------ ...boundChannelAdapterParserTests-context.xml | 35 +-- .../SnsOutboundChannelAdapterParserTests.java | 86 +++--- .../SqsMessageHandlerParserTests-context.xml | 28 ++ .../xml/SqsMessageHandlerParserTests.java | 58 +++- ...ChannelAdapterParserTests-context-good.xml | 9 - .../SqsOutboundChannelAdapterParserTests.java | 24 +- .../AbstractSqsMessageHandlerTests.java | 94 ------- .../aws/outbound/SnsMessageHandlerTests.java | 44 ++- .../aws/outbound/SqsMessageHandlerTests.java | 66 ++++- ...andlerWithQueueMessagingTemplateTests.java | 81 ------ 23 files changed, 903 insertions(+), 770 deletions(-) delete mode 100644 src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundGatewayParser.java create mode 100644 src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java delete mode 100644 src/test/java/org/springframework/integration/aws/outbound/AbstractSqsMessageHandlerTests.java delete mode 100644 src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerWithQueueMessagingTemplateTests.java diff --git a/README.md b/README.md index 11fe449..ec4fe68 100644 --- a/README.md +++ b/README.md @@ -260,12 +260,9 @@ background components and core configuration, please, refer to the documentation ### Outbound Channel Adapter -The SQS Outbound Channel Adapter is presented by the `SqsMessageHandler` implementation -(``) and allows to send message to the SQS `queue` with provided `AmazonSQS` -client. An SQS queue can be configured explicitly on the adapter (using -`org.springframework.integration.expression.ValueExpression`) or as a SpEL `Expression`, which is evaluated against -request message as a root object of evaluation context. In addition the `queue` can be extracted from the message -headers under `AwsHeaders.QUEUE`. +The SQS Outbound Channel Adapter is presented by the `SqsMessageHandler` implementation (``) and allows to send message to the SQS `queue` with provided `AmazonSQS` client. +An SQS queue can be configured explicitly on the adapter (using `org.springframework.integration.expression.ValueExpression`) or as a SpEL `Expression`, which is evaluated against request message as a root object of evaluation context. +In addition the `queue` can be extracted from the message headers under `AwsHeaders.QUEUE`. The Java Configuration is pretty simple: @@ -273,18 +270,10 @@ The Java Configuration is pretty simple: @SpringBootApplication public static class MyConfiguration { - @Autowired - private AmazonSQSAsync amazonSqs; - - @Bean - public QueueMessagingTemplate queueMessagingTemplate() { - return new QueueMessagingTemplate(this.amazonSqs); - } - @Bean @ServiceActivator(inputChannel = "sqsSendChannel") public MessageHandler sqsMessageHandler() { - return new SqsMessageHandler(queueMessagingTemplate()); + return new SqsMessageHandler(AmazonSQSAsync amazonSqs); } } @@ -306,7 +295,7 @@ The SQS Inbound Channel Adapter is a `message-driven` implementation for the `Me `SqsMessageDrivenChannelAdapter`. This channel adapter is based on the `org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer` to receive messages from the provided `queues` in async manner and send an enhanced Spring Integration Message to the provided `MessageChannel`. -The enhancements includes `AwsHeaders.MESSAGE_ID`, `AwsHeaders.RECEIPT_HANDLE` and `AwsHeaders.QUEUE` message headers. +The enhancements includes `AwsHeaders.MESSAGE_ID`, `AwsHeaders.RECEIPT_HANDLE` and `AwsHeaders.RECEIVED_QUEUE` message headers. The Java Configuration is pretty simple: @@ -477,26 +466,6 @@ The XML variant may look like: body-expression="payload.toUpperCase()"/> ```` -### Outbound Gateway - -The `` is fully similar to the one-way channel adapter. -The only difference that in gateway mode the `SnsMessageHandler` produces the reply `Message` as: - -````java -return getMessageBuilderFactory() - .withPayload(publishRequest) - .setHeader(AwsHeaders.TOPIC, publishRequest.getTopicArn()) - .setHeader(AwsHeaders.SNS_PUBLISHED_MESSAGE_ID, publishResult.getMessageId()); -```` - -The reply from the `` may be useful to track and correlate the SNS message -in the downstream flow. - -With Java configuration there is just enough to use constructor of `SnsMessageHandler` with `produceReply` boolean -flag to `true` ot switch it to the gateway mode. -By default the `SnsMessageHandler` is one-way `MessageHandler`. - - ## Metadata Store for Amazon DynamoDB The `DynamoDbMetaDataStore`, a `ConcurrentMetadataStore` implementation, is provided to keep the metadata for Spring Integration components in the distributed Amazon DynamoDB store. @@ -555,6 +524,9 @@ This channel adapter can be configured with the `DynamoDbMetaDataStore` mentione By default this adapter uses `DeserializingConverter` to convert `byte[]` from the `Record` data. Can be specified as `null` with meaning no conversion and the target `Message` is sent with the `byte[]` payload. +Additional headers like `AwsHeaders.RECEIVED_STREAM`, `AwsHeaders.RECEIVED_PARTITION_KEY` and `AwsHeaders.RECEIVED_SEQUENCE_NUMBER` are populated to the message for downstream logic. +When `CheckpointMode.manual` is used the `Checkpointer` instance is populated to the `AwsHeaders.CHECKPOINTER` header for acknowledgment in the downstream logic manually. + The consumer group is included to the metadata store `key`. When records are consumed, they are filtered by the last stored `lastCheckpoint` under the key as `[CONSUMER_GROUP]:[STREAM]:[SHARD_ID]`. diff --git a/src/main/java/org/springframework/integration/aws/config/xml/AwsNamespaceHandler.java b/src/main/java/org/springframework/integration/aws/config/xml/AwsNamespaceHandler.java index c99817c..5705fb1 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/AwsNamespaceHandler.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/AwsNamespaceHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2016 the original author or authors. + * Copyright 2013-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHa * * @author Amol Nayak * @author Artem Bilan + * * @since 0.5 */ public class AwsNamespaceHandler extends AbstractIntegrationNamespaceHandler { @@ -37,7 +38,6 @@ public class AwsNamespaceHandler extends AbstractIntegrationNamespaceHandler { registerBeanDefinitionParser("sqs-message-driven-channel-adapter", new SqsMessageDrivenChannelAdapterParser()); registerBeanDefinitionParser("sns-inbound-channel-adapter", new SnsInboundChannelAdapterParser()); registerBeanDefinitionParser("sns-outbound-channel-adapter", new SnsOutboundChannelAdapterParser()); - registerBeanDefinitionParser("sns-outbound-gateway", new SnsOutboundGatewayParser()); } } diff --git a/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java b/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java index c49b308..c14f129 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/AwsParserUtils.java @@ -16,6 +16,14 @@ package org.springframework.integration.aws.config.xml; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.core.Conventions; +import org.springframework.integration.config.xml.IntegrationNamespaceUtils; + /** * The utility class for the namespace parsers. * @@ -46,13 +54,20 @@ public final class AwsParserUtils { */ public static final String RESOURCE_ID_RESOLVER_REF = "resource-id-resolver"; - /** - * The 'queue-messaging-template' reference attribute name. - */ - public static final String QUEUE_MESSAGING_TEMPLATE_REF = "queue-messaging-template"; - private AwsParserUtils() { super(); } + static void populateExpressionAttribute(String attributeName, BeanDefinitionBuilder builder, + Element element, ParserContext parserContext) { + + BeanDefinition beanDefinition = + IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(attributeName, + attributeName + "-expression", parserContext, element, false); + if (beanDefinition != null) { + builder.addPropertyValue(Conventions.attributeNameToPropertyName(attributeName) + "Expression", + beanDefinition); + } + } + } diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java index 765c95c..5785a4b 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,9 +18,13 @@ package org.springframework.integration.aws.config.xml; import org.w3c.dom.Element; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.aws.outbound.SnsMessageHandler; import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser; +import org.springframework.integration.config.xml.IntegrationNamespaceUtils; /** * The parser for the {@code }. @@ -31,12 +35,32 @@ public class SnsOutboundChannelAdapterParser extends AbstractOutboundChannelAdap @Override protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) { - AbstractBeanDefinition beanDefinition = - new SnsOutboundGatewayParser() - .parseHandler(element, parserContext) - .getBeanDefinition(); - beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(1, false); - return beanDefinition; + String sns = element.getAttribute(AwsParserUtils.SNS_REF); + + BeanDefinitionBuilder builder = + BeanDefinitionBuilder.genericBeanDefinition(SnsMessageHandler.class) + .addConstructorArgReference(sns); + + AwsParserUtils.populateExpressionAttribute("topic-arn", builder, element, parserContext); + AwsParserUtils.populateExpressionAttribute("subject", builder, element, parserContext); + + BeanDefinition message = + IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("body-expression", element); + if (message != null) { + builder.addPropertyValue("bodyExpression", message); + } + + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "resource-id-resolver"); + + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "sync"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "failure-channel"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "async-handler"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "success-channel", "outputChannel"); + + AwsParserUtils.populateExpressionAttribute("send-timeout", builder, element, parserContext); + + return builder.getBeanDefinition(); } } diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundGatewayParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundGatewayParser.java deleted file mode 100644 index 0a3f7b4..0000000 --- a/src/main/java/org/springframework/integration/aws/config/xml/SnsOutboundGatewayParser.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.aws.config.xml; - -import org.w3c.dom.Element; - -import org.springframework.beans.factory.config.BeanDefinition; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.aws.outbound.SnsMessageHandler; -import org.springframework.integration.config.xml.AbstractConsumerEndpointParser; -import org.springframework.integration.config.xml.IntegrationNamespaceUtils; - -/** - * The parser for the {@code }. - * - * @author Artem Bilan - */ -public class SnsOutboundGatewayParser extends AbstractConsumerEndpointParser { - - @Override - protected String getInputChannelAttributeName() { - return "request-channel"; - } - - @Override - protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { - String sns = element.getAttribute(AwsParserUtils.SNS_REF); - - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SnsMessageHandler.class) - .addConstructorArgReference(sns) - .addConstructorArgValue(true); - - BeanDefinition topic = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("topic-arn", - "topic-arn-expression", parserContext, element, false); - if (topic != null) { - builder.addPropertyValue("topicArnExpression", topic); - } - - BeanDefinition subject = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("subject", - "subject-expression", parserContext, element, false); - if (subject != null) { - builder.addPropertyValue("subjectExpression", subject); - } - - BeanDefinition message = - IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("body-expression", element); - if (message != null) { - builder.addPropertyValue("bodyExpression", message); - } - - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-timeout", "sendTimeout"); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel", "outputChannel"); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "resource-id-resolver"); - - return builder; - } - -} diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java index 6d63bb0..b1ae4e1 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SqsMessageDrivenChannelAdapterParser.java @@ -58,11 +58,7 @@ public class SqsMessageDrivenChannelAdapterParser extends AbstractSingleBeanDefi @Override protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { - String sqs = element.getAttribute(AwsParserUtils.SQS_REF); - if (!StringUtils.hasText(sqs)) { - parserContext.getReaderContext().error("'sqs' attribute is required.", element); - } - builder.addConstructorArgReference(sqs) + builder.addConstructorArgReference(element.getAttribute(AwsParserUtils.SQS_REF)) .addConstructorArgValue(element.getAttribute("queues")); String channelName = element.getAttribute("channel"); if (!StringUtils.hasText(channelName)) { diff --git a/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java b/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java index fafc932..61cecac 100644 --- a/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java +++ b/src/main/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2016 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package org.springframework.integration.aws.config.xml; import org.w3c.dom.Element; -import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; @@ -39,37 +38,28 @@ public class SqsOutboundChannelAdapterParser extends AbstractOutboundChannelAdap protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) { BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SqsMessageHandler.class); - String template = element.getAttribute(AwsParserUtils.QUEUE_MESSAGING_TEMPLATE_REF); - boolean hasTemplate = StringUtils.hasText(template); - String sqs = element.getAttribute(AwsParserUtils.SQS_REF); - boolean hasSqs = StringUtils.hasText(sqs); String resourceIdResolver = element.getAttribute(AwsParserUtils.RESOURCE_ID_RESOLVER_REF); boolean hasResourceIdResolver = StringUtils.hasText(resourceIdResolver); - if (hasTemplate && (hasSqs || hasResourceIdResolver)) { - parserContext.getReaderContext().error(AwsParserUtils.QUEUE_MESSAGING_TEMPLATE_REF + - " should not be defined in conjunction with " + AwsParserUtils.SQS_REF - + " or " + AwsParserUtils.RESOURCE_ID_RESOLVER_REF, element); - } - if (!hasTemplate && !hasSqs) { - parserContext.getReaderContext().error("One of " + AwsParserUtils.QUEUE_MESSAGING_TEMPLATE_REF + " or " - + AwsParserUtils.SQS_REF + " must be defined.", element); - } + builder.addConstructorArgReference(element.getAttribute(AwsParserUtils.SQS_REF)); - if (hasSqs) { - builder.addConstructorArgReference(sqs); - } if (hasResourceIdResolver) { builder.addConstructorArgReference(resourceIdResolver); } - if (hasTemplate) { - builder.addConstructorArgReference(template); - } - BeanDefinition queue = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("queue", - "queue-expression", parserContext, element, false); - if (queue != null) { - builder.addPropertyValue("queueExpression", queue); - } + + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "sync"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "message-converter"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "failure-channel"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "async-handler"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "success-channel", "outputChannel"); + + AwsParserUtils.populateExpressionAttribute("queue", builder, element, parserContext); + AwsParserUtils.populateExpressionAttribute("delay", builder, element, parserContext); + AwsParserUtils.populateExpressionAttribute("message-group-id", builder, element, parserContext); + AwsParserUtils.populateExpressionAttribute("message-deduplication-id", builder, element, parserContext); + AwsParserUtils.populateExpressionAttribute("send-timeout", builder, element, parserContext); + return builder.getBeanDefinition(); } diff --git a/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java new file mode 100644 index 0000000..7eb54ef --- /dev/null +++ b/src/main/java/org/springframework/integration/aws/outbound/AbstractAwsMessageHandler.java @@ -0,0 +1,221 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.aws.outbound; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.integration.MessageTimeoutException; +import org.springframework.integration.aws.support.AwsHeaders; +import org.springframework.integration.aws.support.AwsRequestFailureException; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; +import org.springframework.integration.support.DefaultErrorMessageStrategy; +import org.springframework.integration.support.ErrorMessageStrategy; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.util.Assert; + +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.handlers.AsyncHandler; + +/** + * The base {@link AbstractMessageProducingHandler} for AWS services. + * Utilizes common logic ({@link AsyncHandler}, {@link ErrorMessageStrategy}, + * {@code failureChannel} etc.) and message pre- and post-processing, + * + * @author Artem Bilan + * + * @since 2.0 + */ +public abstract class AbstractAwsMessageHandler extends AbstractMessageProducingHandler { + + protected static final long DEFAULT_SEND_TIMEOUT = 10000; + + private AsyncHandler asyncHandler; + + private EvaluationContext evaluationContext; + + private boolean sync; + + private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT); + + private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); + + private MessageChannel failureChannel; + + private String failureChannelName; + + public void setAsyncHandler(AsyncHandler asyncHandler) { + this.asyncHandler = asyncHandler; + } + + protected AsyncHandler getAsyncHandler() { + return this.asyncHandler; + } + + public void setSync(boolean sync) { + this.sync = sync; + } + + protected boolean isSync() { + return this.sync; + } + + public void setSendTimeout(long sendTimeout) { + setSendTimeoutExpression(new ValueExpression<>(sendTimeout)); + } + + public void setSendTimeoutExpressionString(String sendTimeoutExpression) { + setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression)); + } + + public void setSendTimeoutExpression(Expression sendTimeoutExpression) { + Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null"); + this.sendTimeoutExpression = sendTimeoutExpression; + } + + protected Expression getSendTimeoutExpression() { + return this.sendTimeoutExpression; + } + + /** + * Set the failure channel. After a failure on put, an {@link ErrorMessage} will be sent + * to this channel with a payload of a {@link AwsRequestFailureException} with the + * failed message and cause. + * @param failureChannel the failure channel. + */ + public void setFailureChannel(MessageChannel failureChannel) { + this.failureChannel = failureChannel; + } + + /** + * Set the failure channel name. After a failure on put, an {@link ErrorMessage} will be + * sent to this channel name with a payload of a {@link AwsRequestFailureException} + * with the failed message and cause. + * @param failureChannelName the failure channel name. + */ + public void setFailureChannelName(String failureChannelName) { + this.failureChannelName = failureChannelName; + } + + protected MessageChannel getFailureChannel() { + if (this.failureChannel != null) { + return this.failureChannel; + + } + else if (this.failureChannelName != null) { + this.failureChannel = getChannelResolver().resolveDestination(this.failureChannelName); + return this.failureChannel; + } + + return null; + } + + public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) { + Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' must not be null"); + this.errorMessageStrategy = errorMessageStrategy; + } + + protected ErrorMessageStrategy getErrorMessageStrategy() { + return this.errorMessageStrategy; + } + + protected EvaluationContext getEvaluationContext() { + return this.evaluationContext; + } + + @Override + protected void onInit() throws Exception { + super.onInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + + @Override + protected void handleMessageInternal(Message message) throws Exception { + Future resultFuture = handleMessageToAws(message); + + if (this.sync) { + Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class); + if (sendTimeout == null || sendTimeout < 0) { + resultFuture.get(); + } + else { + try { + resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException te) { + throw new MessageTimeoutException(message, "Timeout waiting for response from AmazonKinesis", te); + } + } + } + } + + protected AsyncHandler obtainAsyncHandler(final Message message, + final AmazonWebServiceRequest request) { + + return new AsyncHandler() { + + @Override + public void onError(Exception ex) { + if (getAsyncHandler() != null) { + getAsyncHandler().onError(ex); + } + + if (getFailureChannel() != null) { + AbstractAwsMessageHandler.this.messagingTemplate.send(getFailureChannel(), + getErrorMessageStrategy() + .buildErrorMessage(new AwsRequestFailureException(message, request, ex), null)); + } + } + + @Override + @SuppressWarnings("unchecked") + public void onSuccess(I request, O result) { + if (getAsyncHandler() != null) { + ((AsyncHandler) getAsyncHandler()).onSuccess(request, result); + } + + if (getOutputChannel() != null) { + AbstractIntegrationMessageBuilder messageBuilder = + getMessageBuilderFactory() + .fromMessage(message); + + additionalOnSuccessHeaders(messageBuilder, request, result); + + messageBuilder.setHeaderIfAbsent(AwsHeaders.SERVICE_RESULT, result); + + + AbstractAwsMessageHandler.this.messagingTemplate.send(getOutputChannel(), messageBuilder.build()); + } + } + + }; + } + + protected abstract Future handleMessageToAws(Message message); + + protected abstract void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder messageBuilder, + AmazonWebServiceRequest request, Object result); + +} diff --git a/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java index 3ccc074..be60117 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KinesisMessageHandler.java @@ -18,33 +18,19 @@ package org.springframework.integration.aws.outbound; import java.nio.ByteBuffer; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.springframework.core.convert.converter.Converter; import org.springframework.core.serializer.support.SerializingConverter; -import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; -import org.springframework.integration.MessageTimeoutException; import org.springframework.integration.aws.support.AwsHeaders; -import org.springframework.integration.aws.support.AwsRequestFailureException; -import org.springframework.integration.expression.ExpressionUtils; -import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.handler.AbstractMessageHandler; -import org.springframework.integration.handler.AbstractMessageProducingHandler; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; -import org.springframework.integration.support.DefaultErrorMessageStrategy; -import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.support.ErrorMessage; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import com.amazonaws.AmazonWebServiceRequest; -import com.amazonaws.AmazonWebServiceResult; -import com.amazonaws.ResponseMetadata; import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.services.kinesis.AmazonKinesisAsync; import com.amazonaws.services.kinesis.model.PutRecordRequest; @@ -64,18 +50,12 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult; * @see AmazonKinesisAsync#putRecords(PutRecordsRequest) * @see com.amazonaws.handlers.AsyncHandler */ -public class KinesisMessageHandler extends AbstractMessageProducingHandler { - - private static final long DEFAULT_SEND_TIMEOUT = 10000; +public class KinesisMessageHandler extends AbstractAwsMessageHandler { private final AmazonKinesisAsync amazonKinesis; - private AsyncHandler asyncHandler; - private Converter converter = new SerializingConverter(); - private EvaluationContext evaluationContext; - private volatile Expression streamExpression; private volatile Expression partitionKeyExpression; @@ -84,25 +64,11 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler { private volatile Expression sequenceNumberExpression; - private boolean sync; - - private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT); - - private MessageChannel failureChannel; - - private String failureChannelName; - - private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); - public KinesisMessageHandler(AmazonKinesisAsync amazonKinesis) { Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null."); this.amazonKinesis = amazonKinesis; } - public void setAsyncHandler(AsyncHandler asyncHandler) { - this.asyncHandler = asyncHandler; - } - /** * Specify a {@link Converter} to serialize {@code payload} to the {@code byte[]} * if that isn't {@code byte[]} already. @@ -157,106 +123,31 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler { this.sequenceNumberExpression = sequenceNumberExpression; } - public void setSync(boolean sync) { - this.sync = sync; - } - - public void setSendTimeout(long sendTimeout) { - setSendTimeoutExpression(new ValueExpression<>(sendTimeout)); - } - - public void setSendTimeoutExpressionString(String sendTimeoutExpression) { - setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression)); - } - - public void setSendTimeoutExpression(Expression sendTimeoutExpression) { - this.sendTimeoutExpression = sendTimeoutExpression; - } - - /** - * Set the failure channel. After a failure on put, an {@link ErrorMessage} will be sent - * to this channel with a payload of a {@link AwsRequestFailureException} with the - * failed message and cause. - * @param failureChannel the failure channel. - * @since 1.1.0 - */ - public void setFailureChannel(MessageChannel failureChannel) { - this.failureChannel = failureChannel; - } - - protected MessageChannel getFailureChannel() { - if (this.failureChannel != null) { - return this.failureChannel; - - } - else if (this.failureChannelName != null) { - this.failureChannel = getChannelResolver().resolveDestination(this.failureChannelName); - return this.failureChannel; - } - - return null; - } - - /** - * Set the failure channel name. After a failure on put, an {@link ErrorMessage} will be - * sent to this channel name with a payload of a {@link AwsRequestFailureException} - * with the failed message and cause. - * @param failureChannelName the failure channel name. - * @since 1.1.0 - */ - public void setFailureChannelName(String failureChannelName) { - this.failureChannelName = failureChannelName; - } - @Override - protected void onInit() throws Exception { - super.onInit(); - this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); - } - - @Override - @SuppressWarnings("unchecked") - protected void handleMessageInternal(final Message message) throws Exception { - Future resultFuture; - + protected Future handleMessageToAws(Message message) { if (message.getPayload() instanceof PutRecordsRequest) { - AsyncHandler asyncHandler = - obtainAsyncHandler(message, (PutRecordsRequest) message.getPayload()); + obtainAsyncHandler(message, (PutRecordsRequest) message.getPayload()); - resultFuture = this.amazonKinesis.putRecordsAsync((PutRecordsRequest) message.getPayload(), asyncHandler); + return this.amazonKinesis.putRecordsAsync((PutRecordsRequest) message.getPayload(), asyncHandler); } else { - final PutRecordRequest putRecordRequest = (message.getPayload() instanceof PutRecordRequest) - ? (PutRecordRequest) message.getPayload() - : buildPutRecordRequest(message); + final PutRecordRequest putRecordRequest = + (message.getPayload() instanceof PutRecordRequest) + ? (PutRecordRequest) message.getPayload() + : buildPutRecordRequest(message); AsyncHandler asyncHandler = obtainAsyncHandler(message, putRecordRequest); - resultFuture = this.amazonKinesis.putRecordAsync(putRecordRequest, asyncHandler); - } - - if (this.sync) { - Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class); - if (sendTimeout == null || sendTimeout < 0) { - resultFuture.get(); - } - else { - try { - resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS); - } - catch (TimeoutException te) { - throw new MessageTimeoutException(message, "Timeout waiting for response from AmazonKinesis", te); - } - } + return this.amazonKinesis.putRecordAsync(putRecordRequest, asyncHandler); } } private PutRecordRequest buildPutRecordRequest(Message message) { String stream = message.getHeaders().get(AwsHeaders.STREAM, String.class); if (!StringUtils.hasText(stream) && this.streamExpression != null) { - stream = this.streamExpression.getValue(this.evaluationContext, message, String.class); + stream = this.streamExpression.getValue(getEvaluationContext(), message, String.class); } Assert.state(stream != null, "'stream' must not be null for sending a Kinesis record. " + "Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an " + @@ -264,7 +155,7 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler { String partitionKey = message.getHeaders().get(AwsHeaders.PARTITION_KEY, String.class); if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) { - partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class); + partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class); } Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. " + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') or supply an " + @@ -272,12 +163,12 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler { String explicitHashKey = (this.explicitHashKeyExpression != null - ? this.explicitHashKeyExpression.getValue(this.evaluationContext, message, String.class) + ? this.explicitHashKeyExpression.getValue(getEvaluationContext(), message, String.class) : null); String sequenceNumber = message.getHeaders().get(AwsHeaders.SEQUENCE_NUMBER, String.class); if (!StringUtils.hasText(stream) && this.streamExpression != null) { - partitionKey = this.sequenceNumberExpression.getValue(this.evaluationContext, message, String.class); + partitionKey = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class); } Object payload = message.getPayload(); @@ -304,53 +195,15 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler { .withData(data); } - @SuppressWarnings("rawtypes") - private > AsyncHandler obtainAsyncHandler( - final Message message, final REQUEST request) { + @Override + protected void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder messageBuilder, + AmazonWebServiceRequest request, Object result) { - return new AsyncHandler() { - - @Override - public void onError(Exception ex) { - if (KinesisMessageHandler.this.asyncHandler != null) { - KinesisMessageHandler.this.asyncHandler.onError(ex); - } - - if (getFailureChannel() != null) { - KinesisMessageHandler.this.messagingTemplate.send(getFailureChannel(), - KinesisMessageHandler.this.errorMessageStrategy.buildErrorMessage( - new AwsRequestFailureException(message, request, ex), null)); - } - } - - @Override - @SuppressWarnings("unchecked") - public void onSuccess(REQUEST request, RESULT result) { - if (KinesisMessageHandler.this.asyncHandler != null) { - ((AsyncHandler) KinesisMessageHandler.this.asyncHandler) - .onSuccess(request, result); - } - - if (getOutputChannel() != null) { - AbstractIntegrationMessageBuilder messageBuilder = - getMessageBuilderFactory() - .fromMessage(message); - - if (result instanceof PutRecordResult) { - messageBuilder - .setHeader(AwsHeaders.SHARD, ((PutRecordResult) result).getShardId()) - .setHeader(AwsHeaders.SEQUENCE_NUMBER, ((PutRecordResult) result).getSequenceNumber()); - } - else { - messageBuilder.setHeader(AwsHeaders.SERVICE_RESULT, result); - } - - - KinesisMessageHandler.this.messagingTemplate.send(getOutputChannel(), messageBuilder.build()); - } - } - - }; + if (result instanceof PutRecordResult) { + messageBuilder + .setHeader(AwsHeaders.SHARD, ((PutRecordResult) result).getShardId()) + .setHeader(AwsHeaders.SEQUENCE_NUMBER, ((PutRecordResult) result).getSequenceNumber()); + } } } diff --git a/src/main/java/org/springframework/integration/aws/outbound/SnsMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/SnsMessageHandler.java index 6841f26..e8d2955 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/SnsMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/SnsMessageHandler.java @@ -16,26 +16,28 @@ package org.springframework.integration.aws.outbound; +import java.util.concurrent.Future; + import org.springframework.cloud.aws.core.env.ResourceIdResolver; -import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.TypeLocator; import org.springframework.expression.common.LiteralExpression; import org.springframework.expression.spel.support.StandardTypeLocator; import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.integration.aws.support.SnsBodyBuilder; -import org.springframework.integration.expression.ExpressionUtils; -import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.sns.AmazonSNSAsync; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.PublishResult; /** - * The {@link AbstractReplyProducingMessageHandler} implementation to send SNS Notifications - * ({@link AmazonSNS#publish(PublishRequest)}) to the provided {@code topicArn} + * The {@link AbstractAwsMessageHandler} implementation to send SNS Notifications + * ({@link AmazonSNSAsync#publishAsync(PublishRequest)}) to the provided {@code topicArn} * (or evaluated at runtime against {@link Message}). *

* The SNS Message subject can be evaluated as a result of {@link #subjectExpression}. @@ -69,27 +71,16 @@ import com.amazonaws.services.sns.model.PublishResult; * to the {@link String} using {@link #getConversionService()}. * * - *

- * If this {@link AbstractReplyProducingMessageHandler} is configured with {@link #produceReply} as - * {@code true}, the reply message is composed to be sent to the {@code outputChannel} or - * {@code replyChannel}. The reply message's {@code payload} is exactly the {@link PublishRequest} - * object, which has been just published to SNS. Also this message has {@link AwsHeaders#TOPIC} - * and {@link AwsHeaders#SNS_PUBLISHED_MESSAGE_ID} headers to track published SNS message in the - * downstream. * * @author Artem Bilan * - * @see AmazonSNS + * @see AmazonSNSAsync * @see PublishRequest * @see SnsBodyBuilder */ -public class SnsMessageHandler extends AbstractReplyProducingMessageHandler { +public class SnsMessageHandler extends AbstractAwsMessageHandler { - private final AmazonSNS amazonSns; - - private final boolean produceReply; - - private EvaluationContext evaluationContext; + private final AmazonSNSAsync amazonSns; private Expression topicArnExpression; @@ -99,14 +90,9 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler { private ResourceIdResolver resourceIdResolver; - public SnsMessageHandler(AmazonSNS amazonSns) { - this(amazonSns, false); - } - - public SnsMessageHandler(AmazonSNS amazonSns, boolean produceReply) { + public SnsMessageHandler(AmazonSNSAsync amazonSns) { Assert.notNull(amazonSns, "amazonSns must not be null."); this.amazonSns = amazonSns; - this.produceReply = produceReply; } public void setTopicArn(String topicArn) { @@ -151,10 +137,9 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler { } @Override - protected void doInit() { - super.doInit(); - this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); - TypeLocator typeLocator = this.evaluationContext.getTypeLocator(); + protected void onInit() throws Exception { + super.onInit(); + TypeLocator typeLocator = getEvaluationContext().getTypeLocator(); if (typeLocator instanceof StandardTypeLocator) { /* * Register the 'org.springframework.integration.aws.support' package @@ -165,8 +150,8 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler { } @Override - protected Object handleRequestMessage(Message requestMessage) { - Object payload = requestMessage.getPayload(); + protected Future handleMessageToAws(Message message) { + Object payload = message.getPayload(); PublishRequest publishRequest = null; @@ -176,21 +161,21 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler { else { Assert.state(this.topicArnExpression != null, "'topicArn' or 'topicArnExpression' must be specified."); publishRequest = new PublishRequest(); - String topicArn = this.topicArnExpression.getValue(this.evaluationContext, requestMessage, String.class); + String topicArn = this.topicArnExpression.getValue(getEvaluationContext(), message, String.class); if (this.resourceIdResolver != null) { topicArn = this.resourceIdResolver.resolveToPhysicalResourceId(topicArn); } publishRequest.setTopicArn(topicArn); if (this.subjectExpression != null) { - String subject = this.subjectExpression.getValue(this.evaluationContext, requestMessage, String.class); + String subject = this.subjectExpression.getValue(getEvaluationContext(), message, String.class); publishRequest.setSubject(subject); } - Object snsMessage = requestMessage.getPayload(); + Object snsMessage = message.getPayload(); if (this.bodyExpression != null) { - snsMessage = this.bodyExpression.getValue(this.evaluationContext, requestMessage); + snsMessage = this.bodyExpression.getValue(getEvaluationContext(), message); } if (snsMessage instanceof SnsBodyBuilder) { @@ -202,16 +187,25 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler { } } - PublishResult publishResult = this.amazonSns.publish(publishRequest); + AsyncHandler asyncHandler = obtainAsyncHandler(message, publishRequest); + return this.amazonSns.publishAsync(publishRequest, asyncHandler); - if (this.produceReply) { - return getMessageBuilderFactory() - .withPayload(publishRequest) - .setHeader(AwsHeaders.TOPIC, publishRequest.getTopicArn()) - .setHeader(AwsHeaders.SNS_PUBLISHED_MESSAGE_ID, publishResult.getMessageId()); + } + + @Override + protected void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder messageBuilder, + AmazonWebServiceRequest request, Object result) { + + if (request instanceof PublishRequest) { + PublishRequest publishRequest = (PublishRequest) request; + + messageBuilder.setHeader(AwsHeaders.TOPIC, publishRequest.getTopicArn()); } - else { - return null; + + if (result instanceof PublishResult) { + PublishResult publishResult = (PublishResult) result; + + messageBuilder.setHeader(AwsHeaders.MESSAGE_ID, publishResult.getMessageId()); } } diff --git a/src/main/java/org/springframework/integration/aws/outbound/SqsMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/SqsMessageHandler.java index bcf2236..445864b 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/SqsMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/SqsMessageHandler.java @@ -16,19 +16,30 @@ package org.springframework.integration.aws.outbound; +import java.util.concurrent.Future; + import org.springframework.cloud.aws.core.env.ResourceIdResolver; -import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; -import org.springframework.expression.EvaluationContext; +import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver; import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.aws.support.AwsHeaders; -import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.messaging.Message; +import org.springframework.messaging.converter.GenericMessageConverter; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.core.DestinationResolver; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.services.sqs.AmazonSQSAsync; +import com.amazonaws.services.sqs.model.SendMessageBatchRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchResult; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; /** * The {@link AbstractMessageHandler} implementation for the Amazon SQS {@code sendMessage}. @@ -37,33 +48,43 @@ import com.amazonaws.services.sqs.AmazonSQSAsync; * @author Rahul Pilani * @author Taylor Wicksell * - * @see QueueMessagingTemplate - * @see org.springframework.cloud.aws.messaging.core.QueueMessageChannel - */ -public class SqsMessageHandler extends AbstractMessageHandler { + * @see AmazonSQSAsync#sendMessageAsync(SendMessageRequest, AsyncHandler) + * @see com.amazonaws.handlers.AsyncHandler - private final QueueMessagingTemplate template; + */ +public class SqsMessageHandler extends AbstractAwsMessageHandler { + + private final AmazonSQSAsync amazonSqs; + + private final DestinationResolver destinationResolver; + + private MessageConverter messageConverter; private Expression queueExpression; - private EvaluationContext evaluationContext; + private Expression delayExpression; + + private Expression messageGroupIdExpression; + + private Expression messageDeduplicationIdExpression; + public SqsMessageHandler(AmazonSQSAsync amazonSqs) { this(amazonSqs, null); } public SqsMessageHandler(AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) { - this(new QueueMessagingTemplate(amazonSqs, resourceIdResolver)); - } - - public SqsMessageHandler(QueueMessagingTemplate template) { - Assert.notNull(template, "template must not be null."); - this.template = template; + this.amazonSqs = amazonSqs; + this.destinationResolver = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver); } public void setQueue(String queue) { Assert.hasText(queue, "'queue' must not be empty"); - this.queueExpression = new LiteralExpression(queue); + setQueueExpression(new LiteralExpression(queue)); + } + + public void setQueueExpressionString(String queueExpression) { + setQueueExpression(EXPRESSION_PARSER.parseExpression(queueExpression)); } public void setQueueExpression(Expression queueExpression) { @@ -71,22 +92,116 @@ public class SqsMessageHandler extends AbstractMessageHandler { this.queueExpression = queueExpression; } + public void setDelay(int delaySeconds) { + setDelayExpression(new ValueExpression<>(delaySeconds)); + } + + public void setDelayExpressionString(String delayExpression) { + setDelayExpression(EXPRESSION_PARSER.parseExpression(delayExpression)); + } + + public void setDelayExpression(Expression delayExpression) { + Assert.notNull(delayExpression, "'delayExpression' must not be null"); + this.delayExpression = delayExpression; + } + + public void setMessageGroupId(String messageGroupId) { + setMessageGroupIdExpression(new LiteralExpression(messageGroupId)); + } + + public void setMessageGroupIdExpressionString(String groupIdExpression) { + setMessageGroupIdExpression(EXPRESSION_PARSER.parseExpression(groupIdExpression)); + } + + public void setMessageGroupIdExpression(Expression messageGroupIdExpression) { + Assert.notNull(messageGroupIdExpression, "'messageGroupIdExpression' must not be null"); + this.messageGroupIdExpression = messageGroupIdExpression; + } + + public void setMessageDeduplicationId(String messageDeduplicationId) { + setMessageDeduplicationIdExpression(new LiteralExpression(messageDeduplicationId)); + } + + public void setMessageDeduplicationIdExpressionString(String messageDeduplicationIdExpression) { + setMessageDeduplicationIdExpression(EXPRESSION_PARSER.parseExpression(messageDeduplicationIdExpression)); + } + + public void setMessageDeduplicationIdExpression(Expression messageDeduplicationIdExpression) { + Assert.notNull(messageDeduplicationIdExpression, "'messageDeduplicationIdExpression' must not be null"); + this.messageDeduplicationIdExpression = messageDeduplicationIdExpression; + } + + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + @Override protected void onInit() throws Exception { super.onInit(); - this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + + if (this.messageConverter == null) { + this.messageConverter = new GenericMessageConverter(getConversionService()); + } } @Override - protected void handleMessageInternal(Message message) throws Exception { - String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class); - if (!StringUtils.hasText(queue) && this.queueExpression != null) { - queue = this.queueExpression.getValue(this.evaluationContext, message, String.class); + @SuppressWarnings("unchecked") + protected Future handleMessageToAws(Message message) { + Object payload = message.getPayload(); + if (payload instanceof SendMessageBatchRequest) { + AsyncHandler asyncHandler = + obtainAsyncHandler(message, (SendMessageBatchRequest) payload); + return this.amazonSqs.sendMessageBatchAsync((SendMessageBatchRequest) payload, asyncHandler); + } + + SendMessageRequest sendMessageRequest; + if (payload instanceof SendMessageRequest) { + sendMessageRequest = (SendMessageRequest) payload; + } + else { + String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class); + if (!StringUtils.hasText(queue) && this.queueExpression != null) { + queue = this.queueExpression.getValue(getEvaluationContext(), message, String.class); + } + Assert.state(queue != null, "'queue' must not be null for sending an SQS message. " + + "Consider configuring this handler with a 'queue'( or 'queueExpression') or supply an " + + "'aws_queue' message header"); + + String queueUrl = (String) this.destinationResolver.resolveDestination(queue); + String messageBody = (String) this.messageConverter.fromMessage(message, String.class); + sendMessageRequest = new SendMessageRequest(queueUrl, messageBody); + + if (this.delayExpression != null) { + Integer delay = this.delayExpression.getValue(getEvaluationContext(), message, Integer.class); + sendMessageRequest.setDelaySeconds(delay); + } + + if (this.messageGroupIdExpression != null) { + String messageGroupId = + this.messageGroupIdExpression.getValue(getEvaluationContext(), message, String.class); + sendMessageRequest.setMessageGroupId(messageGroupId); + } + + if (this.messageDeduplicationIdExpression != null) { + String messageDeduplicationId = + this.messageDeduplicationIdExpression.getValue(getEvaluationContext(), message, String.class); + sendMessageRequest.setMessageDeduplicationId(messageDeduplicationId); + } + } + AsyncHandler asyncHandler = + obtainAsyncHandler(message, sendMessageRequest); + return this.amazonSqs.sendMessageAsync(sendMessageRequest, asyncHandler); + } + + @Override + protected void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder messageBuilder, + AmazonWebServiceRequest request, Object result) { + + if (result instanceof SendMessageResult) { + SendMessageResult sendMessageResult = (SendMessageResult) result; + messageBuilder.setHeaderIfAbsent(AwsHeaders.MESSAGE_ID, sendMessageResult.getMessageId()); + messageBuilder.setHeaderIfAbsent(AwsHeaders.SEQUENCE_NUMBER, sendMessageResult.getSequenceNumber()); } - Assert.state(queue != null, "'queue' must not be null for sending an SQS message. " + - "Consider configuring this handler with a 'queue'( or 'queueExpression') or supply an " + - "'aws_queue' message header"); - this.template.send(queue, message); } } diff --git a/src/main/java/org/springframework/integration/aws/support/AwsHeaders.java b/src/main/java/org/springframework/integration/aws/support/AwsHeaders.java index b541cd0..8f1849b 100644 --- a/src/main/java/org/springframework/integration/aws/support/AwsHeaders.java +++ b/src/main/java/org/springframework/integration/aws/support/AwsHeaders.java @@ -65,11 +65,6 @@ public abstract class AwsHeaders { */ public static final String SNS_MESSAGE_TYPE = PREFIX + "snsMessageType"; - /** - * The {@value SNS_PUBLISHED_MESSAGE_ID} header for message published over SNS. - */ - public static final String SNS_PUBLISHED_MESSAGE_ID = PREFIX + "snsPublishedMessageId"; - /** * The {@value SHARD} header to represent Kinesis shardId. */ @@ -101,7 +96,7 @@ public abstract class AwsHeaders { public static final String PARTITION_KEY = PREFIX + "partitionKey"; /** - * The {@value SEQUENCE_NUMBER} header for sending data to Kinesis. + * The {@value SEQUENCE_NUMBER} header for sending data to SQS/Kinesis. */ public static final String SEQUENCE_NUMBER = PREFIX + "sequenceNumber"; diff --git a/src/main/resources/org/springframework/integration/aws/config/spring-integration-aws-2.0.xsd b/src/main/resources/org/springframework/integration/aws/config/spring-integration-aws-2.0.xsd index 403e701..ec0629f 100644 --- a/src/main/resources/org/springframework/integration/aws/config/spring-integration-aws-2.0.xsd +++ b/src/main/resources/org/springframework/integration/aws/config/spring-integration-aws-2.0.xsd @@ -248,9 +248,9 @@ - - - + + + @@ -303,7 +303,7 @@ + type="org.springframework.integration.file.filters.FileListFilter"/> @@ -357,22 +357,22 @@ - + - + - + + type="org.springframework.integration.file.remote.session.SessionFactory"/> @@ -383,10 +383,10 @@ - - Allows you to provide remote file/directory - separator character. DEFAULT: '/' - + + Allows you to provide remote file/directory + separator character. DEFAULT: '/' + @@ -428,7 +428,7 @@ - + @@ -450,6 +450,92 @@ + + + + + + Boolean value to indicate whether the target AWS operation should be performed async (default) + or sync manner. + + + + + + + + + + The timeout in milliseconds to wait for AWS response in sync mode. + Defaults to 10 seconds. + Mutually exclusive with 'send-timeout-expression'. + + + + + + + A SpEL expression that resolves a timeout in milliseconds at runtime + to wait for AWS response in sync mode. + Mutually exclusive with 'send-timeout'. + + + + + + + + + + + + The reference to the 'org.springframework.integration.support.ErrorMessageStrategy' bean. + Defaults to 'DefaultErrorMessageStrategy'. + + + + + + + + + + + + The message channel to send error messages in the async mode. + + + + + + + + + + + + The message channel to send confirmation messages from the callback in the async mode. + + + + + + + + + + + + Asynchronous callback handler for events in the lifecycle of the request. Users can provide an + implementation of the callback methods in this interface to receive notification of successful or + unsuccessful completion of the operation. + By default successful reply is sent to the 'success-channel' and error message to the + 'failure-channel' if they are provided. + + + + + @@ -471,6 +557,7 @@ Mutually exclusive with 'queue-expression'. This attribute isn't mandatory and the queue can be specified in message headers with the 'AwsHeaders.QUEUE' header name. + Mutually exclusive with 'queue-expression'. @@ -482,24 +569,86 @@ Mutually exclusive with 'queue'. This attribute isn't mandatory and the queue can be specified in message headers with the 'AwsHeaders.QUEUE' header name. + Mutually exclusive with 'queue'. - + - A bean reference to the QueueMessagingTemplate that this class is supposed to use. - This attribute is mutually exclusive with the 'sqs' and 'resource-id-resolver' - attributes. + The length of time, in seconds, for which to delay a specific message. + Valid values: 0 to 900. Maximum: 15 minutes. + Messages with a positive delay value become available for processing after the delay + period is finished. + If not specified, the default value for the queue applies. + Mutually exclusive with 'delay-expression'. + + + + + + + A SpEL expression that resolves to the length of time, in seconds, + for which to delay a specific message. + Mutually exclusive with 'delay'. + + + + + + + The tag that specifies that a message belongs to a specific message group. + Messages that belong to the same message group are processed in a FIFO manner + (however, messages in different message groups might be processed out of order). + To interleave multiple ordered streams within a single queue, use 'MessageGroupId' + values (for example, session data for multiple users). + In this scenario, multiple readers can process the queue, but the session data + of each user is processed in a FIFO fashion. + Mutually exclusive with 'message-group-id-expression'. + + + + + + + A SpEL expression that resolves a 'MessageGroupId' token at runtime. + Mutually exclusive with 'message-group-id'. + + + + + + + The token used for deduplication of sent messages. + If a message with a particular 'MessageDeduplicationId' is sent successfully, + any messages sent with the same 'MessageDeduplicationId' are accepted successfully + but aren't delivered during the 5-minute deduplication interval. + Mutually exclusive with 'message-deduplication-id-expression'. + + + + + + + A SpEL expression that resolves a 'MessageDeduplicationId' token at runtime. + Mutually exclusive with 'message-deduplication-id'. + + + + + + + A bean reference to the MessageConverter. + type="org.springframework.messaging.converter.MessageConverter"/> + @@ -725,70 +874,7 @@ - - - - - - - - - - Defines an outbound SNS Channel Adapter for publishing messages to the topic. - - - - - - - - - - - - - Identifies the request channel attached to this gateway. - - - - - - - - - - - - Identifies the reply channel attached to this - gateway. - - - - - - - - - - + @@ -801,7 +887,7 @@ Base type for the 'sqs-message-driven-channel-adapter' and 'sqs-outbound-channel-adapter' elements. - + The 'com.amazonaws.services.sqs.AmazonSQS' bean reference. diff --git a/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests-context.xml b/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests-context.xml index ce69321..2b30d6d 100644 --- a/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests-context.xml +++ b/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests-context.xml @@ -8,34 +8,39 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> - + + + + + + + + + + + + + - - - - diff --git a/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests.java b/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests.java index 816b3b2..0cda343 100644 --- a/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests.java +++ b/src/test/java/org/springframework/integration/aws/config/xml/SnsOutboundChannelAdapterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,18 +18,14 @@ package org.springframework.integration.aws.config.xml; import static org.assertj.core.api.Assertions.assertThat; -import java.util.List; - import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.aws.core.env.ResourceIdResolver; -import org.springframework.expression.Expression; import org.springframework.integration.endpoint.AbstractEndpoint; -import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; +import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -37,7 +33,8 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.sns.AmazonSNSAsync; /** * @author Artem Bilan @@ -48,18 +45,14 @@ import com.amazonaws.services.sns.AmazonSNS; public class SnsOutboundChannelAdapterParserTests { @Autowired - private AmazonSNS amazonSns; - - @Autowired - @Qualifier("defaultAdapter") - private MessageChannel defaultAdapterChannel; + private AmazonSNSAsync amazonSns; @Autowired @Qualifier("errorChannel") private MessageChannel errorChannel; @Autowired - @Qualifier("defaultAdapter.adapter") + @Qualifier("defaultAdapter") private AbstractEndpoint defaultAdapter; @Autowired @@ -70,29 +63,24 @@ public class SnsOutboundChannelAdapterParserTests { @Qualifier("notificationChannel") private MessageChannel notificationChannel; - @Autowired - @Qualifier("snsGateway") - private AbstractEndpoint snsGateway; - - @Autowired - @Qualifier("snsGateway.handler") - private MessageHandler snsGatewayHandler; - @Autowired private ResourceIdResolver resourceIdResolver; + @Autowired + private ErrorMessageStrategy errorMessageStrategy; + + @Autowired + private AsyncHandler asyncHandler; + + @Autowired + private MessageChannel successChannel; + @Test - public void testSnsOutboundChannelAdapterDefaultParser() throws Exception { + public void testSnsOutboundChannelAdapterDefaultParser() { Object handler = TestUtils.getPropertyValue(this.defaultAdapter, "handler"); - assertThat(AopUtils.isAopProxy(handler)).isFalse(); - - assertThat(handler).isSameAs(this.defaultAdapterHandler); - - assertThat(TestUtils.getPropertyValue(handler, "adviceChain", List.class).get(0)) - .isInstanceOf(RequestHandlerRetryAdvice.class); assertThat(TestUtils.getPropertyValue(this.defaultAdapter, "inputChannel")) - .isSameAs(this.defaultAdapterChannel); + .isSameAs(this.notificationChannel); assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "amazonSns")).isSameAs(this.amazonSns); assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "evaluationContext")).isNotNull(); @@ -101,30 +89,28 @@ public class SnsOutboundChannelAdapterParserTests { assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "bodyExpression")).isNull(); assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "resourceIdResolver")) .isSameAs(this.resourceIdResolver); - } + assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "failureChannel")) + .isSameAs(this.errorChannel); - @Test - public void testSnsOutboundGatewayParser() { - assertThat(TestUtils.getPropertyValue(this.snsGateway, "inputChannel")).isSameAs(this.notificationChannel); - assertThat(TestUtils.getPropertyValue(this.snsGateway, "handler")).isSameAs(this.snsGatewayHandler); - assertThat(TestUtils.getPropertyValue(this.snsGateway, "autoStartup", Boolean.class)).isFalse(); - assertThat(TestUtils.getPropertyValue(this.snsGateway, "phase", Integer.class)).isEqualTo(201); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "produceReply", Boolean.class)).isTrue(); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "outputChannel")).isSameAs(this.errorChannel); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "resourceIdResolver")) + assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "resourceIdResolver")) .isSameAs(this.resourceIdResolver); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "amazonSns")).isSameAs(this.amazonSns); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "evaluationContext")).isNotNull(); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "topicArnExpression", Expression.class) - .getExpressionString()) - .isEqualTo("foo"); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "subjectExpression", Expression.class) - .getExpressionString()) - .isEqualTo("bar"); - assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "bodyExpression", Expression.class) - .getExpressionString()) - .isEqualTo("payload.toUpperCase()"); + assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "outputChannel")) + .isSameAs(this.successChannel); + + assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "errorMessageStrategy")) + .isSameAs(this.errorMessageStrategy); + + assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "asyncHandler")) + .isSameAs(this.asyncHandler); + + assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "sync", Boolean.class)) + .isFalse(); + + assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, + "sendTimeoutExpression.literalValue")) + .isEqualTo("202"); + } } diff --git a/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests-context.xml b/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests-context.xml index 4c84db0..b6de975 100644 --- a/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests-context.xml +++ b/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests-context.xml @@ -2,9 +2,11 @@ @@ -13,12 +15,38 @@ + + + + + + + + + + + + + + + + diff --git a/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests.java b/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests.java index 4fe9f28..5e801e0 100644 --- a/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests.java +++ b/src/test/java/org/springframework/integration/aws/config/xml/SqsMessageHandlerParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2016 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,13 +25,16 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.aws.core.env.ResourceIdResolver; import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.converter.MessageConverter; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.services.sqs.AmazonSQS; /** @@ -48,9 +51,24 @@ public class SqsMessageHandlerParserTests { @Autowired private ResourceIdResolver resourceIdResolver; + @Autowired + private ErrorMessageStrategy errorMessageStrategy; + + @Autowired + private MessageConverter messageConverter; + + @Autowired + private AsyncHandler asyncHandler; + @Autowired private MessageChannel errorChannel; + @Autowired + private MessageChannel failureChannel; + + @Autowired + private MessageChannel successChannel; + @Autowired private EventDrivenConsumer sqsOutboundChannelAdapter; @@ -60,10 +78,10 @@ public class SqsMessageHandlerParserTests { @Test public void testSqsMessageHandlerParser() { - assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "template.amazonSqs")) + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "amazonSqs")) .isSameAs(this.amazonSqs); assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, - "template.destinationResolver.targetDestinationResolver.resourceIdResolver")) + "destinationResolver.resourceIdResolver")) .isSameAs(this.resourceIdResolver); assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "queueExpression.literalValue")) @@ -75,6 +93,40 @@ public class SqsMessageHandlerParserTests { .isSameAs(this.errorChannel); assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapter, "handler")) .isSameAs(this.sqsOutboundChannelAdapterHandler); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, + "delayExpression.expression")) + .isEqualTo("'200'"); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, + "messageDeduplicationIdExpression.literalValue")) + .isEqualTo("foo"); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, + "messageGroupIdExpression.expression")) + .isEqualTo("'bar'"); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "failureChannel")) + .isSameAs(this.failureChannel); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "outputChannel")) + .isSameAs(this.successChannel); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "messageConverter")) + .isSameAs(this.messageConverter); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "errorMessageStrategy")) + .isSameAs(this.errorMessageStrategy); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "asyncHandler")) + .isSameAs(this.asyncHandler); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "sync", Boolean.class)) + .isFalse(); + + assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, + "sendTimeoutExpression.literalValue")) + .isEqualTo("202"); } } diff --git a/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests-context-good.xml b/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests-context-good.xml index 7f8d726..24b63c2 100644 --- a/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests-context-good.xml +++ b/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests-context-good.xml @@ -13,8 +13,6 @@ - - - - diff --git a/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests.java b/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests.java index f5e401a..e5712f6 100644 --- a/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests.java +++ b/src/test/java/org/springframework/integration/aws/config/xml/SqsOutboundChannelAdapterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2016 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,10 @@ package org.springframework.integration.aws.config.xml; -import static org.assertj.core.api.Assertions.assertThat; - import org.junit.Test; import org.springframework.beans.factory.BeanDefinitionStoreException; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.aws.outbound.SqsMessageHandler; -import org.springframework.integration.test.util.TestUtils; /** * @author Rahul Pilani @@ -52,21 +47,4 @@ public class SqsOutboundChannelAdapterParserTests { new ClassPathXmlApplicationContext("SqsOutboundChannelAdapterParserTests-context-bad4.xml", getClass()).close(); } - @Test - public void test_happy_path_with_queue_messaging_template() { - ConfigurableApplicationContext applicationContext = - new ClassPathXmlApplicationContext("SqsOutboundChannelAdapterParserTests-context-good.xml", getClass()); - - SqsMessageHandler handlerWithTemplate = - applicationContext.getBean("sqsOutboundChannelAdapterWithQueueMessagingTemplate.handler", - SqsMessageHandler.class); - assertThat(TestUtils.getPropertyValue(handlerWithTemplate, "template")).isNotNull(); - - SqsMessageHandler handlerWithSqs = applicationContext.getBean("sqsOutboundChannelAdapterWithSqs.handler", - SqsMessageHandler.class); - assertThat(TestUtils.getPropertyValue(handlerWithSqs, "template")).isNotNull(); - - applicationContext.close(); - } - } diff --git a/src/test/java/org/springframework/integration/aws/outbound/AbstractSqsMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/AbstractSqsMessageHandlerTests.java deleted file mode 100644 index f8c76d1..0000000 --- a/src/test/java/org/springframework/integration/aws/outbound/AbstractSqsMessageHandlerTests.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2015-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.aws.outbound; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.expression.Expression; -import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.integration.aws.support.AwsHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandlingException; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.test.annotation.DirtiesContext; - -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.SendMessageRequest; - -/** - * Parent class to contain tests that exercise the SqsMessageHandler class - * - * Subclasses can instantiate SqsMessageHandler in their own way. - * - * @author Rahul Pilani - * @author Artem Bilan - */ -@DirtiesContext -public abstract class AbstractSqsMessageHandlerTests { - - @Autowired - protected AmazonSQS amazonSqs; - - @Autowired - protected MessageChannel sqsSendChannel; - - @Autowired - protected SqsMessageHandler sqsMessageHandler; - - @Test - public void testSqsMessageHandler() { - Message message = MessageBuilder.withPayload("message").build(); - try { - this.sqsSendChannel.send(message); - } - catch (Exception e) { - assertThat(e).isInstanceOf(MessageHandlingException.class); - assertThat(e.getCause()).isInstanceOf(IllegalStateException.class); - } - - this.sqsMessageHandler.setQueue("foo"); - this.sqsSendChannel.send(message); - ArgumentCaptor sendMessageRequestArgumentCaptor = - ArgumentCaptor.forClass(SendMessageRequest.class); - verify(this.amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture()); - assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl()) - .isEqualTo("http://queue-url.com/foo"); - - message = MessageBuilder.withPayload("message").setHeader(AwsHeaders.QUEUE, "bar").build(); - this.sqsSendChannel.send(message); - verify(this.amazonSqs, times(2)).sendMessage(sendMessageRequestArgumentCaptor.capture()); - assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl()) - .isEqualTo("http://queue-url.com/bar"); - - SpelExpressionParser spelExpressionParser = new SpelExpressionParser(); - Expression expression = spelExpressionParser.parseExpression("headers.foo"); - this.sqsMessageHandler.setQueueExpression(expression); - message = MessageBuilder.withPayload("message").setHeader("foo", "baz").build(); - this.sqsSendChannel.send(message); - verify(this.amazonSqs, times(3)).sendMessage(sendMessageRequestArgumentCaptor.capture()); - assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl()) - .isEqualTo("http://queue-url.com/baz"); - } - -} diff --git a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java index f444ddb..486fa3c 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SnsMessageHandlerTests.java @@ -39,11 +39,14 @@ import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; +import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.amazonaws.services.sns.AmazonSNS; +import com.amazonaws.handlers.AsyncHandler; +import com.amazonaws.services.sns.AmazonSNSAsync; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.PublishResult; @@ -61,28 +64,29 @@ public class SnsMessageHandlerTests { private MessageChannel sendToSnsChannel; @Autowired - private AmazonSNS amazonSNS; + private AmazonSNSAsync amazonSNS; + + @Autowired + private PollableChannel resultChannel; @Test + @SuppressWarnings("unchecked") public void testSnsMessageHandler() { SnsBodyBuilder payload = SnsBodyBuilder.withDefault("foo") .forProtocols("{\"foo\" : \"bar\"}", "sms"); - QueueChannel replyChannel = new QueueChannel(); - Message message = MessageBuilder.withPayload(payload) .setHeader("topic", "topic") .setHeader("subject", "subject") - .setReplyChannel(replyChannel) .build(); this.sendToSnsChannel.send(message); - Message reply = replyChannel.receive(1000); + Message reply = this.resultChannel.receive(10000); assertThat(reply).isNotNull(); ArgumentCaptor captor = ArgumentCaptor.forClass(PublishRequest.class); - verify(this.amazonSNS).publish(captor.capture()); + verify(this.amazonSNS).publishAsync(captor.capture(), any(AsyncHandler.class)); PublishRequest publishRequest = captor.getValue(); @@ -92,9 +96,9 @@ public class SnsMessageHandlerTests { assertThat(publishRequest.getMessage()) .isEqualTo("{\"default\":\"foo\",\"sms\":\"{\\\"foo\\\" : \\\"bar\\\"}\"}"); - assertThat(reply.getHeaders().get(AwsHeaders.SNS_PUBLISHED_MESSAGE_ID)).isEqualTo("111"); + assertThat(reply.getHeaders().get(AwsHeaders.MESSAGE_ID)).isEqualTo("111"); assertThat(reply.getHeaders().get(AwsHeaders.TOPIC)).isEqualTo("topic"); - assertThat(reply.getPayload()).isSameAs(publishRequest); + assertThat(reply.getPayload()).isSameAs(payload); } @Configuration @@ -102,23 +106,35 @@ public class SnsMessageHandlerTests { public static class ContextConfiguration { @Bean - public AmazonSNS amazonSNS() { - AmazonSNS mock = mock(AmazonSNS.class); + @SuppressWarnings("unchecked") + public AmazonSNSAsync amazonSNS() { + AmazonSNSAsync mock = mock(AmazonSNSAsync.class); - willAnswer(invocation -> new PublishResult().withMessageId("111")) + willAnswer(invocation -> { + PublishResult publishResult = new PublishResult().withMessageId("111"); + AsyncHandler asyncHandler = invocation.getArgument(1); + asyncHandler.onSuccess(invocation.getArgument(0), publishResult); + return new AsyncResult<>(publishResult); + }) .given(mock) - .publish(any(PublishRequest.class)); + .publishAsync(any(PublishRequest.class), any(AsyncHandler.class)); return mock; } + @Bean + public PollableChannel resultChannel() { + return new QueueChannel(); + } + @Bean @ServiceActivator(inputChannel = "sendToSnsChannel") public MessageHandler snsMessageHandler() { - SnsMessageHandler snsMessageHandler = new SnsMessageHandler(amazonSNS(), true); + SnsMessageHandler snsMessageHandler = new SnsMessageHandler(amazonSNS()); snsMessageHandler.setTopicArnExpression(PARSER.parseExpression("headers.topic")); snsMessageHandler.setSubjectExpression(PARSER.parseExpression("headers.subject")); snsMessageHandler.setBodyExpression(PARSER.parseExpression("payload")); + snsMessageHandler.setOutputChannel(resultChannel()); return snsMessageHandler; } diff --git a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java index 49f1e09..889d198 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerTests.java @@ -16,23 +16,38 @@ package org.springframework.integration.aws.outbound; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.integration.config.EnableIntegration; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.services.sqs.AmazonSQSAsync; import com.amazonaws.services.sqs.model.GetQueueUrlRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import com.amazonaws.services.sqs.model.SendMessageRequest; /** * Instantiating SqsMessageHandler using amazonSqs. @@ -42,8 +57,57 @@ import com.amazonaws.services.sqs.model.GetQueueUrlResult; */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration -public class SqsMessageHandlerTests extends AbstractSqsMessageHandlerTests { +public class SqsMessageHandlerTests { + @Autowired + protected AmazonSQSAsync amazonSqs; + + @Autowired + protected MessageChannel sqsSendChannel; + + @Autowired + protected SqsMessageHandler sqsMessageHandler; + + @Test + @SuppressWarnings("unchecked") + public void testSqsMessageHandler() { + Message message = MessageBuilder.withPayload("message").build(); + try { + this.sqsSendChannel.send(message); + } + catch (Exception e) { + assertThat(e).isInstanceOf(MessageHandlingException.class); + assertThat(e.getCause()).isInstanceOf(IllegalStateException.class); + } + + this.sqsMessageHandler.setQueue("foo"); + this.sqsSendChannel.send(message); + ArgumentCaptor sendMessageRequestArgumentCaptor = + ArgumentCaptor.forClass(SendMessageRequest.class); + verify(this.amazonSqs) + .sendMessageAsync(sendMessageRequestArgumentCaptor.capture(), any(AsyncHandler.class)); + assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl()) + .isEqualTo("http://queue-url.com/foo"); + + message = MessageBuilder.withPayload("message").setHeader(AwsHeaders.QUEUE, "bar").build(); + this.sqsSendChannel.send(message); + verify(this.amazonSqs, times(2)) + .sendMessageAsync(sendMessageRequestArgumentCaptor.capture(), any(AsyncHandler.class)); + + assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl()) + .isEqualTo("http://queue-url.com/bar"); + + SpelExpressionParser spelExpressionParser = new SpelExpressionParser(); + Expression expression = spelExpressionParser.parseExpression("headers.foo"); + this.sqsMessageHandler.setQueueExpression(expression); + message = MessageBuilder.withPayload("message").setHeader("foo", "baz").build(); + this.sqsSendChannel.send(message); + verify(this.amazonSqs, times(3)) + .sendMessageAsync(sendMessageRequestArgumentCaptor.capture(), any(AsyncHandler.class)); + + assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl()) + .isEqualTo("http://queue-url.com/baz"); + } @Configuration @EnableIntegration diff --git a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerWithQueueMessagingTemplateTests.java b/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerWithQueueMessagingTemplateTests.java deleted file mode 100644 index 5b49027..0000000 --- a/src/test/java/org/springframework/integration/aws/outbound/SqsMessageHandlerWithQueueMessagingTemplateTests.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2015-2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.aws.outbound; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.BDDMockito.willAnswer; -import static org.mockito.Mockito.mock; - -import org.junit.runner.RunWith; - -import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.config.EnableIntegration; -import org.springframework.messaging.MessageHandler; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import com.amazonaws.services.sqs.AmazonSQSAsync; -import com.amazonaws.services.sqs.model.GetQueueUrlRequest; -import com.amazonaws.services.sqs.model.GetQueueUrlResult; - -/** - * Instantiating SqsMessageHandler using QueueMessagingTemplate. - * - * @author Rahul Pilani - */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration -public class SqsMessageHandlerWithQueueMessagingTemplateTests extends AbstractSqsMessageHandlerTests { - - - @Configuration - @EnableIntegration - public static class ContextConfiguration { - - @Bean - public AmazonSQSAsync amazonSqs() { - AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); - - willAnswer(invocation -> { - GetQueueUrlRequest getQueueUrlRequest = (GetQueueUrlRequest) invocation.getArguments()[0]; - GetQueueUrlResult queueUrl = new GetQueueUrlResult(); - queueUrl.setQueueUrl("http://queue-url.com/" + getQueueUrlRequest.getQueueName()); - return queueUrl; - }) - .given(amazonSqs) - .getQueueUrl(any(GetQueueUrlRequest.class)); - - return amazonSqs; - } - - @Bean - public QueueMessagingTemplate queueMessagingTemplate() { - return new QueueMessagingTemplate(amazonSqs()); - } - - @Bean - @ServiceActivator(inputChannel = "sqsSendChannel") - public MessageHandler sqsMessageHandler() { - return new SqsMessageHandler(queueMessagingTemplate()); - } - - } - -}