GH-62: Align Message Handlers for common API

Fixes: spring-projects/spring-integration-aws#62

* Make `SnsMessageHandler extends AbstractAwsMessageHandler`
* Remove SNS Outbound Gateway variant since `SnsMessageHandler` covers
that part via `successChannel` and `failureChannel`
* Fix XSD for SQS and SNS
* Fix SQS and SNS tests according their logic changes
* Fix README for new changes
This commit is contained in:
Artem Bilan
2017-11-22 12:13:32 -05:00
parent b4c7690cf1
commit 2e63b2c682
23 changed files with 903 additions and 770 deletions

View File

@@ -260,12 +260,9 @@ background components and core configuration, please, refer to the documentation
### Outbound Channel Adapter
The SQS Outbound Channel Adapter is presented by the `SqsMessageHandler` implementation
(`<int-aws:sqs-outbound-channel-adapter>`) and allows to send message to the SQS `queue` with provided `AmazonSQS`
client. An SQS queue can be configured explicitly on the adapter (using
`org.springframework.integration.expression.ValueExpression`) or as a SpEL `Expression`, which is evaluated against
request message as a root object of evaluation context. In addition the `queue` can be extracted from the message
headers under `AwsHeaders.QUEUE`.
The SQS Outbound Channel Adapter is presented by the `SqsMessageHandler` implementation (`<int-aws:sqs-outbound-channel-adapter>`) and allows to send message to the SQS `queue` with provided `AmazonSQS` client.
An SQS queue can be configured explicitly on the adapter (using `org.springframework.integration.expression.ValueExpression`) or as a SpEL `Expression`, which is evaluated against request message as a root object of evaluation context.
In addition the `queue` can be extracted from the message headers under `AwsHeaders.QUEUE`.
The Java Configuration is pretty simple:
@@ -273,18 +270,10 @@ The Java Configuration is pretty simple:
@SpringBootApplication
public static class MyConfiguration {
@Autowired
private AmazonSQSAsync amazonSqs;
@Bean
public QueueMessagingTemplate queueMessagingTemplate() {
return new QueueMessagingTemplate(this.amazonSqs);
}
@Bean
@ServiceActivator(inputChannel = "sqsSendChannel")
public MessageHandler sqsMessageHandler() {
return new SqsMessageHandler(queueMessagingTemplate());
return new SqsMessageHandler(AmazonSQSAsync amazonSqs);
}
}
@@ -306,7 +295,7 @@ The SQS Inbound Channel Adapter is a `message-driven` implementation for the `Me
`SqsMessageDrivenChannelAdapter`. This channel adapter is based on the
`org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer` to receive messages from the
provided `queues` in async manner and send an enhanced Spring Integration Message to the provided `MessageChannel`.
The enhancements includes `AwsHeaders.MESSAGE_ID`, `AwsHeaders.RECEIPT_HANDLE` and `AwsHeaders.QUEUE` message headers.
The enhancements includes `AwsHeaders.MESSAGE_ID`, `AwsHeaders.RECEIPT_HANDLE` and `AwsHeaders.RECEIVED_QUEUE` message headers.
The Java Configuration is pretty simple:
@@ -477,26 +466,6 @@ The XML variant may look like:
body-expression="payload.toUpperCase()"/>
````
### Outbound Gateway
The `<int-aws:sns-outbound-gateway>` is fully similar to the one-way channel adapter.
The only difference that in gateway mode the `SnsMessageHandler` produces the reply `Message` as:
````java
return getMessageBuilderFactory()
.withPayload(publishRequest)
.setHeader(AwsHeaders.TOPIC, publishRequest.getTopicArn())
.setHeader(AwsHeaders.SNS_PUBLISHED_MESSAGE_ID, publishResult.getMessageId());
````
The reply from the `<int-aws:sns-outbound-gateway>` may be useful to track and correlate the SNS message
in the downstream flow.
With Java configuration there is just enough to use constructor of `SnsMessageHandler` with `produceReply` boolean
flag to `true` ot switch it to the gateway mode.
By default the `SnsMessageHandler` is one-way `MessageHandler`.
## Metadata Store for Amazon DynamoDB
The `DynamoDbMetaDataStore`, a `ConcurrentMetadataStore` implementation, is provided to keep the metadata for Spring Integration components in the distributed Amazon DynamoDB store.
@@ -555,6 +524,9 @@ This channel adapter can be configured with the `DynamoDbMetaDataStore` mentione
By default this adapter uses `DeserializingConverter` to convert `byte[]` from the `Record` data.
Can be specified as `null` with meaning no conversion and the target `Message` is sent with the `byte[]` payload.
Additional headers like `AwsHeaders.RECEIVED_STREAM`, `AwsHeaders.RECEIVED_PARTITION_KEY` and `AwsHeaders.RECEIVED_SEQUENCE_NUMBER` are populated to the message for downstream logic.
When `CheckpointMode.manual` is used the `Checkpointer` instance is populated to the `AwsHeaders.CHECKPOINTER` header for acknowledgment in the downstream logic manually.
The consumer group is included to the metadata store `key`.
When records are consumed, they are filtered by the last stored `lastCheckpoint` under the key as `[CONSUMER_GROUP]:[STREAM]:[SHARD_ID]`.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2013-2016 the original author or authors.
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHa
*
* @author Amol Nayak
* @author Artem Bilan
*
* @since 0.5
*/
public class AwsNamespaceHandler extends AbstractIntegrationNamespaceHandler {
@@ -37,7 +38,6 @@ public class AwsNamespaceHandler extends AbstractIntegrationNamespaceHandler {
registerBeanDefinitionParser("sqs-message-driven-channel-adapter", new SqsMessageDrivenChannelAdapterParser());
registerBeanDefinitionParser("sns-inbound-channel-adapter", new SnsInboundChannelAdapterParser());
registerBeanDefinitionParser("sns-outbound-channel-adapter", new SnsOutboundChannelAdapterParser());
registerBeanDefinitionParser("sns-outbound-gateway", new SnsOutboundGatewayParser());
}
}

View File

@@ -16,6 +16,14 @@
package org.springframework.integration.aws.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.core.Conventions;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
/**
* The utility class for the namespace parsers.
*
@@ -46,13 +54,20 @@ public final class AwsParserUtils {
*/
public static final String RESOURCE_ID_RESOLVER_REF = "resource-id-resolver";
/**
* The 'queue-messaging-template' reference attribute name.
*/
public static final String QUEUE_MESSAGING_TEMPLATE_REF = "queue-messaging-template";
private AwsParserUtils() {
super();
}
static void populateExpressionAttribute(String attributeName, BeanDefinitionBuilder builder,
Element element, ParserContext parserContext) {
BeanDefinition beanDefinition =
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(attributeName,
attributeName + "-expression", parserContext, element, false);
if (beanDefinition != null) {
builder.addPropertyValue(Conventions.attributeNameToPropertyName(attributeName) + "Expression",
beanDefinition);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,9 +18,13 @@ package org.springframework.integration.aws.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.aws.outbound.SnsMessageHandler;
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
/**
* The parser for the {@code <int-aws:sns-outbound-channel-adapter>}.
@@ -31,12 +35,32 @@ public class SnsOutboundChannelAdapterParser extends AbstractOutboundChannelAdap
@Override
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
AbstractBeanDefinition beanDefinition =
new SnsOutboundGatewayParser()
.parseHandler(element, parserContext)
.getBeanDefinition();
beanDefinition.getConstructorArgumentValues().addIndexedArgumentValue(1, false);
return beanDefinition;
String sns = element.getAttribute(AwsParserUtils.SNS_REF);
BeanDefinitionBuilder builder =
BeanDefinitionBuilder.genericBeanDefinition(SnsMessageHandler.class)
.addConstructorArgReference(sns);
AwsParserUtils.populateExpressionAttribute("topic-arn", builder, element, parserContext);
AwsParserUtils.populateExpressionAttribute("subject", builder, element, parserContext);
BeanDefinition message =
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("body-expression", element);
if (message != null) {
builder.addPropertyValue("bodyExpression", message);
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "resource-id-resolver");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "sync");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "failure-channel");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "async-handler");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "success-channel", "outputChannel");
AwsParserUtils.populateExpressionAttribute("send-timeout", builder, element, parserContext);
return builder.getBeanDefinition();
}
}

View File

@@ -1,73 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.aws.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.aws.outbound.SnsMessageHandler;
import org.springframework.integration.config.xml.AbstractConsumerEndpointParser;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
/**
* The parser for the {@code <int-aws:sns-outbound-gateway>}.
*
* @author Artem Bilan
*/
public class SnsOutboundGatewayParser extends AbstractConsumerEndpointParser {
@Override
protected String getInputChannelAttributeName() {
return "request-channel";
}
@Override
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
String sns = element.getAttribute(AwsParserUtils.SNS_REF);
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SnsMessageHandler.class)
.addConstructorArgReference(sns)
.addConstructorArgValue(true);
BeanDefinition topic = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("topic-arn",
"topic-arn-expression", parserContext, element, false);
if (topic != null) {
builder.addPropertyValue("topicArnExpression", topic);
}
BeanDefinition subject = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("subject",
"subject-expression", parserContext, element, false);
if (subject != null) {
builder.addPropertyValue("subjectExpression", subject);
}
BeanDefinition message =
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("body-expression", element);
if (message != null) {
builder.addPropertyValue("bodyExpression", message);
}
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-timeout", "sendTimeout");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel", "outputChannel");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "resource-id-resolver");
return builder;
}
}

View File

@@ -58,11 +58,7 @@ public class SqsMessageDrivenChannelAdapterParser extends AbstractSingleBeanDefi
@Override
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
String sqs = element.getAttribute(AwsParserUtils.SQS_REF);
if (!StringUtils.hasText(sqs)) {
parserContext.getReaderContext().error("'sqs' attribute is required.", element);
}
builder.addConstructorArgReference(sqs)
builder.addConstructorArgReference(element.getAttribute(AwsParserUtils.SQS_REF))
.addConstructorArgValue(element.getAttribute("queues"));
String channelName = element.getAttribute("channel");
if (!StringUtils.hasText(channelName)) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@ package org.springframework.integration.aws.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
@@ -39,37 +38,28 @@ public class SqsOutboundChannelAdapterParser extends AbstractOutboundChannelAdap
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SqsMessageHandler.class);
String template = element.getAttribute(AwsParserUtils.QUEUE_MESSAGING_TEMPLATE_REF);
boolean hasTemplate = StringUtils.hasText(template);
String sqs = element.getAttribute(AwsParserUtils.SQS_REF);
boolean hasSqs = StringUtils.hasText(sqs);
String resourceIdResolver = element.getAttribute(AwsParserUtils.RESOURCE_ID_RESOLVER_REF);
boolean hasResourceIdResolver = StringUtils.hasText(resourceIdResolver);
if (hasTemplate && (hasSqs || hasResourceIdResolver)) {
parserContext.getReaderContext().error(AwsParserUtils.QUEUE_MESSAGING_TEMPLATE_REF +
" should not be defined in conjunction with " + AwsParserUtils.SQS_REF
+ " or " + AwsParserUtils.RESOURCE_ID_RESOLVER_REF, element);
}
if (!hasTemplate && !hasSqs) {
parserContext.getReaderContext().error("One of " + AwsParserUtils.QUEUE_MESSAGING_TEMPLATE_REF + " or "
+ AwsParserUtils.SQS_REF + " must be defined.", element);
}
builder.addConstructorArgReference(element.getAttribute(AwsParserUtils.SQS_REF));
if (hasSqs) {
builder.addConstructorArgReference(sqs);
}
if (hasResourceIdResolver) {
builder.addConstructorArgReference(resourceIdResolver);
}
if (hasTemplate) {
builder.addConstructorArgReference(template);
}
BeanDefinition queue = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("queue",
"queue-expression", parserContext, element, false);
if (queue != null) {
builder.addPropertyValue("queueExpression", queue);
}
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "sync");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "message-converter");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "failure-channel");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "async-handler");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "success-channel", "outputChannel");
AwsParserUtils.populateExpressionAttribute("queue", builder, element, parserContext);
AwsParserUtils.populateExpressionAttribute("delay", builder, element, parserContext);
AwsParserUtils.populateExpressionAttribute("message-group-id", builder, element, parserContext);
AwsParserUtils.populateExpressionAttribute("message-deduplication-id", builder, element, parserContext);
AwsParserUtils.populateExpressionAttribute("send-timeout", builder, element, parserContext);
return builder.getBeanDefinition();
}

View File

@@ -0,0 +1,221 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.aws.outbound;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.AwsRequestFailureException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.AsyncHandler;
/**
* The base {@link AbstractMessageProducingHandler} for AWS services.
* Utilizes common logic ({@link AsyncHandler}, {@link ErrorMessageStrategy},
* {@code failureChannel} etc.) and message pre- and post-processing,
*
* @author Artem Bilan
*
* @since 2.0
*/
public abstract class AbstractAwsMessageHandler extends AbstractMessageProducingHandler {
protected static final long DEFAULT_SEND_TIMEOUT = 10000;
private AsyncHandler<? extends AmazonWebServiceRequest, ?> asyncHandler;
private EvaluationContext evaluationContext;
private boolean sync;
private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
private MessageChannel failureChannel;
private String failureChannelName;
public void setAsyncHandler(AsyncHandler<? extends AmazonWebServiceRequest, ?> asyncHandler) {
this.asyncHandler = asyncHandler;
}
protected AsyncHandler<? extends AmazonWebServiceRequest, ?> getAsyncHandler() {
return this.asyncHandler;
}
public void setSync(boolean sync) {
this.sync = sync;
}
protected boolean isSync() {
return this.sync;
}
public void setSendTimeout(long sendTimeout) {
setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
}
public void setSendTimeoutExpressionString(String sendTimeoutExpression) {
setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression));
}
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null");
this.sendTimeoutExpression = sendTimeoutExpression;
}
protected Expression getSendTimeoutExpression() {
return this.sendTimeoutExpression;
}
/**
* Set the failure channel. After a failure on put, an {@link ErrorMessage} will be sent
* to this channel with a payload of a {@link AwsRequestFailureException} with the
* failed message and cause.
* @param failureChannel the failure channel.
*/
public void setFailureChannel(MessageChannel failureChannel) {
this.failureChannel = failureChannel;
}
/**
* Set the failure channel name. After a failure on put, an {@link ErrorMessage} will be
* sent to this channel name with a payload of a {@link AwsRequestFailureException}
* with the failed message and cause.
* @param failureChannelName the failure channel name.
*/
public void setFailureChannelName(String failureChannelName) {
this.failureChannelName = failureChannelName;
}
protected MessageChannel getFailureChannel() {
if (this.failureChannel != null) {
return this.failureChannel;
}
else if (this.failureChannelName != null) {
this.failureChannel = getChannelResolver().resolveDestination(this.failureChannelName);
return this.failureChannel;
}
return null;
}
public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' must not be null");
this.errorMessageStrategy = errorMessageStrategy;
}
protected ErrorMessageStrategy getErrorMessageStrategy() {
return this.errorMessageStrategy;
}
protected EvaluationContext getEvaluationContext() {
return this.evaluationContext;
}
@Override
protected void onInit() throws Exception {
super.onInit();
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
}
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
Future<?> resultFuture = handleMessageToAws(message);
if (this.sync) {
Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
if (sendTimeout == null || sendTimeout < 0) {
resultFuture.get();
}
else {
try {
resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
}
catch (TimeoutException te) {
throw new MessageTimeoutException(message, "Timeout waiting for response from AmazonKinesis", te);
}
}
}
}
protected <I extends AmazonWebServiceRequest, O> AsyncHandler<I, O> obtainAsyncHandler(final Message<?> message,
final AmazonWebServiceRequest request) {
return new AsyncHandler<I, O>() {
@Override
public void onError(Exception ex) {
if (getAsyncHandler() != null) {
getAsyncHandler().onError(ex);
}
if (getFailureChannel() != null) {
AbstractAwsMessageHandler.this.messagingTemplate.send(getFailureChannel(),
getErrorMessageStrategy()
.buildErrorMessage(new AwsRequestFailureException(message, request, ex), null));
}
}
@Override
@SuppressWarnings("unchecked")
public void onSuccess(I request, O result) {
if (getAsyncHandler() != null) {
((AsyncHandler<I, O>) getAsyncHandler()).onSuccess(request, result);
}
if (getOutputChannel() != null) {
AbstractIntegrationMessageBuilder<?> messageBuilder =
getMessageBuilderFactory()
.fromMessage(message);
additionalOnSuccessHeaders(messageBuilder, request, result);
messageBuilder.setHeaderIfAbsent(AwsHeaders.SERVICE_RESULT, result);
AbstractAwsMessageHandler.this.messagingTemplate.send(getOutputChannel(), messageBuilder.build());
}
}
};
}
protected abstract Future<?> handleMessageToAws(Message<?> message);
protected abstract void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder<?> messageBuilder,
AmazonWebServiceRequest request, Object result);
}

View File

@@ -18,33 +18,19 @@ package org.springframework.integration.aws.outbound;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.SerializingConverter;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.AwsRequestFailureException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.AmazonWebServiceResult;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
@@ -64,18 +50,12 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult;
* @see AmazonKinesisAsync#putRecords(PutRecordsRequest)
* @see com.amazonaws.handlers.AsyncHandler
*/
public class KinesisMessageHandler extends AbstractMessageProducingHandler {
private static final long DEFAULT_SEND_TIMEOUT = 10000;
public class KinesisMessageHandler extends AbstractAwsMessageHandler {
private final AmazonKinesisAsync amazonKinesis;
private AsyncHandler<? extends AmazonWebServiceRequest, ?> asyncHandler;
private Converter<Object, byte[]> converter = new SerializingConverter();
private EvaluationContext evaluationContext;
private volatile Expression streamExpression;
private volatile Expression partitionKeyExpression;
@@ -84,25 +64,11 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler {
private volatile Expression sequenceNumberExpression;
private boolean sync;
private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
private MessageChannel failureChannel;
private String failureChannelName;
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
public KinesisMessageHandler(AmazonKinesisAsync amazonKinesis) {
Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null.");
this.amazonKinesis = amazonKinesis;
}
public void setAsyncHandler(AsyncHandler<? extends AmazonWebServiceRequest, ?> asyncHandler) {
this.asyncHandler = asyncHandler;
}
/**
* Specify a {@link Converter} to serialize {@code payload} to the {@code byte[]}
* if that isn't {@code byte[]} already.
@@ -157,106 +123,31 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler {
this.sequenceNumberExpression = sequenceNumberExpression;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public void setSendTimeout(long sendTimeout) {
setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
}
public void setSendTimeoutExpressionString(String sendTimeoutExpression) {
setSendTimeoutExpression(EXPRESSION_PARSER.parseExpression(sendTimeoutExpression));
}
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
this.sendTimeoutExpression = sendTimeoutExpression;
}
/**
* Set the failure channel. After a failure on put, an {@link ErrorMessage} will be sent
* to this channel with a payload of a {@link AwsRequestFailureException} with the
* failed message and cause.
* @param failureChannel the failure channel.
* @since 1.1.0
*/
public void setFailureChannel(MessageChannel failureChannel) {
this.failureChannel = failureChannel;
}
protected MessageChannel getFailureChannel() {
if (this.failureChannel != null) {
return this.failureChannel;
}
else if (this.failureChannelName != null) {
this.failureChannel = getChannelResolver().resolveDestination(this.failureChannelName);
return this.failureChannel;
}
return null;
}
/**
* Set the failure channel name. After a failure on put, an {@link ErrorMessage} will be
* sent to this channel name with a payload of a {@link AwsRequestFailureException}
* with the failed message and cause.
* @param failureChannelName the failure channel name.
* @since 1.1.0
*/
public void setFailureChannelName(String failureChannelName) {
this.failureChannelName = failureChannelName;
}
@Override
protected void onInit() throws Exception {
super.onInit();
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
}
@Override
@SuppressWarnings("unchecked")
protected void handleMessageInternal(final Message<?> message) throws Exception {
Future<?> resultFuture;
protected Future<?> handleMessageToAws(Message<?> message) {
if (message.getPayload() instanceof PutRecordsRequest) {
AsyncHandler<PutRecordsRequest, PutRecordsResult> asyncHandler =
obtainAsyncHandler(message, (PutRecordsRequest) message.getPayload());
obtainAsyncHandler(message, (PutRecordsRequest) message.getPayload());
resultFuture = this.amazonKinesis.putRecordsAsync((PutRecordsRequest) message.getPayload(), asyncHandler);
return this.amazonKinesis.putRecordsAsync((PutRecordsRequest) message.getPayload(), asyncHandler);
}
else {
final PutRecordRequest putRecordRequest = (message.getPayload() instanceof PutRecordRequest)
? (PutRecordRequest) message.getPayload()
: buildPutRecordRequest(message);
final PutRecordRequest putRecordRequest =
(message.getPayload() instanceof PutRecordRequest)
? (PutRecordRequest) message.getPayload()
: buildPutRecordRequest(message);
AsyncHandler<PutRecordRequest, PutRecordResult> asyncHandler =
obtainAsyncHandler(message, putRecordRequest);
resultFuture = this.amazonKinesis.putRecordAsync(putRecordRequest, asyncHandler);
}
if (this.sync) {
Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
if (sendTimeout == null || sendTimeout < 0) {
resultFuture.get();
}
else {
try {
resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
}
catch (TimeoutException te) {
throw new MessageTimeoutException(message, "Timeout waiting for response from AmazonKinesis", te);
}
}
return this.amazonKinesis.putRecordAsync(putRecordRequest, asyncHandler);
}
}
private PutRecordRequest buildPutRecordRequest(Message<?> message) {
String stream = message.getHeaders().get(AwsHeaders.STREAM, String.class);
if (!StringUtils.hasText(stream) && this.streamExpression != null) {
stream = this.streamExpression.getValue(this.evaluationContext, message, String.class);
stream = this.streamExpression.getValue(getEvaluationContext(), message, String.class);
}
Assert.state(stream != null, "'stream' must not be null for sending a Kinesis record. " +
"Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an " +
@@ -264,7 +155,7 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler {
String partitionKey = message.getHeaders().get(AwsHeaders.PARTITION_KEY, String.class);
if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class);
partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class);
}
Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. " +
"Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') or supply an " +
@@ -272,12 +163,12 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler {
String explicitHashKey =
(this.explicitHashKeyExpression != null
? this.explicitHashKeyExpression.getValue(this.evaluationContext, message, String.class)
? this.explicitHashKeyExpression.getValue(getEvaluationContext(), message, String.class)
: null);
String sequenceNumber = message.getHeaders().get(AwsHeaders.SEQUENCE_NUMBER, String.class);
if (!StringUtils.hasText(stream) && this.streamExpression != null) {
partitionKey = this.sequenceNumberExpression.getValue(this.evaluationContext, message, String.class);
partitionKey = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
}
Object payload = message.getPayload();
@@ -304,53 +195,15 @@ public class KinesisMessageHandler extends AbstractMessageProducingHandler {
.withData(data);
}
@SuppressWarnings("rawtypes")
private <REQUEST extends AmazonWebServiceRequest, RESULT extends AmazonWebServiceResult<?extends ResponseMetadata>> AsyncHandler<REQUEST, RESULT> obtainAsyncHandler(
final Message<?> message, final REQUEST request) {
@Override
protected void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder<?> messageBuilder,
AmazonWebServiceRequest request, Object result) {
return new AsyncHandler<REQUEST, RESULT>() {
@Override
public void onError(Exception ex) {
if (KinesisMessageHandler.this.asyncHandler != null) {
KinesisMessageHandler.this.asyncHandler.onError(ex);
}
if (getFailureChannel() != null) {
KinesisMessageHandler.this.messagingTemplate.send(getFailureChannel(),
KinesisMessageHandler.this.errorMessageStrategy.buildErrorMessage(
new AwsRequestFailureException(message, request, ex), null));
}
}
@Override
@SuppressWarnings("unchecked")
public void onSuccess(REQUEST request, RESULT result) {
if (KinesisMessageHandler.this.asyncHandler != null) {
((AsyncHandler<REQUEST, RESULT>) KinesisMessageHandler.this.asyncHandler)
.onSuccess(request, result);
}
if (getOutputChannel() != null) {
AbstractIntegrationMessageBuilder<?> messageBuilder =
getMessageBuilderFactory()
.fromMessage(message);
if (result instanceof PutRecordResult) {
messageBuilder
.setHeader(AwsHeaders.SHARD, ((PutRecordResult) result).getShardId())
.setHeader(AwsHeaders.SEQUENCE_NUMBER, ((PutRecordResult) result).getSequenceNumber());
}
else {
messageBuilder.setHeader(AwsHeaders.SERVICE_RESULT, result);
}
KinesisMessageHandler.this.messagingTemplate.send(getOutputChannel(), messageBuilder.build());
}
}
};
if (result instanceof PutRecordResult) {
messageBuilder
.setHeader(AwsHeaders.SHARD, ((PutRecordResult) result).getShardId())
.setHeader(AwsHeaders.SEQUENCE_NUMBER, ((PutRecordResult) result).getSequenceNumber());
}
}
}

View File

@@ -16,26 +16,28 @@
package org.springframework.integration.aws.outbound;
import java.util.concurrent.Future;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.TypeLocator;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.SnsBodyBuilder;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
/**
* The {@link AbstractReplyProducingMessageHandler} implementation to send SNS Notifications
* ({@link AmazonSNS#publish(PublishRequest)}) to the provided {@code topicArn}
* The {@link AbstractAwsMessageHandler} implementation to send SNS Notifications
* ({@link AmazonSNSAsync#publishAsync(PublishRequest)}) to the provided {@code topicArn}
* (or evaluated at runtime against {@link Message}).
* <p>
* The SNS Message subject can be evaluated as a result of {@link #subjectExpression}.
@@ -69,27 +71,16 @@ import com.amazonaws.services.sns.model.PublishResult;
* to the {@link String} using {@link #getConversionService()}.
* </li>
* </ul>
* <p>
* If this {@link AbstractReplyProducingMessageHandler} is configured with {@link #produceReply} as
* {@code true}, the reply message is composed to be sent to the {@code outputChannel} or
* {@code replyChannel}. The reply message's {@code payload} is exactly the {@link PublishRequest}
* object, which has been just published to SNS. Also this message has {@link AwsHeaders#TOPIC}
* and {@link AwsHeaders#SNS_PUBLISHED_MESSAGE_ID} headers to track published SNS message in the
* downstream.
*
* @author Artem Bilan
*
* @see AmazonSNS
* @see AmazonSNSAsync
* @see PublishRequest
* @see SnsBodyBuilder
*/
public class SnsMessageHandler extends AbstractReplyProducingMessageHandler {
public class SnsMessageHandler extends AbstractAwsMessageHandler {
private final AmazonSNS amazonSns;
private final boolean produceReply;
private EvaluationContext evaluationContext;
private final AmazonSNSAsync amazonSns;
private Expression topicArnExpression;
@@ -99,14 +90,9 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler {
private ResourceIdResolver resourceIdResolver;
public SnsMessageHandler(AmazonSNS amazonSns) {
this(amazonSns, false);
}
public SnsMessageHandler(AmazonSNS amazonSns, boolean produceReply) {
public SnsMessageHandler(AmazonSNSAsync amazonSns) {
Assert.notNull(amazonSns, "amazonSns must not be null.");
this.amazonSns = amazonSns;
this.produceReply = produceReply;
}
public void setTopicArn(String topicArn) {
@@ -151,10 +137,9 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler {
}
@Override
protected void doInit() {
super.doInit();
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
TypeLocator typeLocator = this.evaluationContext.getTypeLocator();
protected void onInit() throws Exception {
super.onInit();
TypeLocator typeLocator = getEvaluationContext().getTypeLocator();
if (typeLocator instanceof StandardTypeLocator) {
/*
* Register the 'org.springframework.integration.aws.support' package
@@ -165,8 +150,8 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler {
}
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
Object payload = requestMessage.getPayload();
protected Future<?> handleMessageToAws(Message<?> message) {
Object payload = message.getPayload();
PublishRequest publishRequest = null;
@@ -176,21 +161,21 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler {
else {
Assert.state(this.topicArnExpression != null, "'topicArn' or 'topicArnExpression' must be specified.");
publishRequest = new PublishRequest();
String topicArn = this.topicArnExpression.getValue(this.evaluationContext, requestMessage, String.class);
String topicArn = this.topicArnExpression.getValue(getEvaluationContext(), message, String.class);
if (this.resourceIdResolver != null) {
topicArn = this.resourceIdResolver.resolveToPhysicalResourceId(topicArn);
}
publishRequest.setTopicArn(topicArn);
if (this.subjectExpression != null) {
String subject = this.subjectExpression.getValue(this.evaluationContext, requestMessage, String.class);
String subject = this.subjectExpression.getValue(getEvaluationContext(), message, String.class);
publishRequest.setSubject(subject);
}
Object snsMessage = requestMessage.getPayload();
Object snsMessage = message.getPayload();
if (this.bodyExpression != null) {
snsMessage = this.bodyExpression.getValue(this.evaluationContext, requestMessage);
snsMessage = this.bodyExpression.getValue(getEvaluationContext(), message);
}
if (snsMessage instanceof SnsBodyBuilder) {
@@ -202,16 +187,25 @@ public class SnsMessageHandler extends AbstractReplyProducingMessageHandler {
}
}
PublishResult publishResult = this.amazonSns.publish(publishRequest);
AsyncHandler<PublishRequest, PublishResult> asyncHandler = obtainAsyncHandler(message, publishRequest);
return this.amazonSns.publishAsync(publishRequest, asyncHandler);
if (this.produceReply) {
return getMessageBuilderFactory()
.withPayload(publishRequest)
.setHeader(AwsHeaders.TOPIC, publishRequest.getTopicArn())
.setHeader(AwsHeaders.SNS_PUBLISHED_MESSAGE_ID, publishResult.getMessageId());
}
@Override
protected void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder<?> messageBuilder,
AmazonWebServiceRequest request, Object result) {
if (request instanceof PublishRequest) {
PublishRequest publishRequest = (PublishRequest) request;
messageBuilder.setHeader(AwsHeaders.TOPIC, publishRequest.getTopicArn());
}
else {
return null;
if (result instanceof PublishResult) {
PublishResult publishResult = (PublishResult) result;
messageBuilder.setHeader(AwsHeaders.MESSAGE_ID, publishResult.getMessageId());
}
}

View File

@@ -16,19 +16,30 @@
package org.springframework.integration.aws.outbound;
import java.util.concurrent.Future;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.expression.EvaluationContext;
import org.springframework.cloud.aws.messaging.support.destination.DynamicQueueUrlDestinationResolver;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
/**
* The {@link AbstractMessageHandler} implementation for the Amazon SQS {@code sendMessage}.
@@ -37,33 +48,43 @@ import com.amazonaws.services.sqs.AmazonSQSAsync;
* @author Rahul Pilani
* @author Taylor Wicksell
*
* @see QueueMessagingTemplate
* @see org.springframework.cloud.aws.messaging.core.QueueMessageChannel
*/
public class SqsMessageHandler extends AbstractMessageHandler {
* @see AmazonSQSAsync#sendMessageAsync(SendMessageRequest, AsyncHandler)
* @see com.amazonaws.handlers.AsyncHandler
private final QueueMessagingTemplate template;
*/
public class SqsMessageHandler extends AbstractAwsMessageHandler {
private final AmazonSQSAsync amazonSqs;
private final DestinationResolver<?> destinationResolver;
private MessageConverter messageConverter;
private Expression queueExpression;
private EvaluationContext evaluationContext;
private Expression delayExpression;
private Expression messageGroupIdExpression;
private Expression messageDeduplicationIdExpression;
public SqsMessageHandler(AmazonSQSAsync amazonSqs) {
this(amazonSqs, null);
}
public SqsMessageHandler(AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) {
this(new QueueMessagingTemplate(amazonSqs, resourceIdResolver));
}
public SqsMessageHandler(QueueMessagingTemplate template) {
Assert.notNull(template, "template must not be null.");
this.template = template;
this.amazonSqs = amazonSqs;
this.destinationResolver = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver);
}
public void setQueue(String queue) {
Assert.hasText(queue, "'queue' must not be empty");
this.queueExpression = new LiteralExpression(queue);
setQueueExpression(new LiteralExpression(queue));
}
public void setQueueExpressionString(String queueExpression) {
setQueueExpression(EXPRESSION_PARSER.parseExpression(queueExpression));
}
public void setQueueExpression(Expression queueExpression) {
@@ -71,22 +92,116 @@ public class SqsMessageHandler extends AbstractMessageHandler {
this.queueExpression = queueExpression;
}
public void setDelay(int delaySeconds) {
setDelayExpression(new ValueExpression<>(delaySeconds));
}
public void setDelayExpressionString(String delayExpression) {
setDelayExpression(EXPRESSION_PARSER.parseExpression(delayExpression));
}
public void setDelayExpression(Expression delayExpression) {
Assert.notNull(delayExpression, "'delayExpression' must not be null");
this.delayExpression = delayExpression;
}
public void setMessageGroupId(String messageGroupId) {
setMessageGroupIdExpression(new LiteralExpression(messageGroupId));
}
public void setMessageGroupIdExpressionString(String groupIdExpression) {
setMessageGroupIdExpression(EXPRESSION_PARSER.parseExpression(groupIdExpression));
}
public void setMessageGroupIdExpression(Expression messageGroupIdExpression) {
Assert.notNull(messageGroupIdExpression, "'messageGroupIdExpression' must not be null");
this.messageGroupIdExpression = messageGroupIdExpression;
}
public void setMessageDeduplicationId(String messageDeduplicationId) {
setMessageDeduplicationIdExpression(new LiteralExpression(messageDeduplicationId));
}
public void setMessageDeduplicationIdExpressionString(String messageDeduplicationIdExpression) {
setMessageDeduplicationIdExpression(EXPRESSION_PARSER.parseExpression(messageDeduplicationIdExpression));
}
public void setMessageDeduplicationIdExpression(Expression messageDeduplicationIdExpression) {
Assert.notNull(messageDeduplicationIdExpression, "'messageDeduplicationIdExpression' must not be null");
this.messageDeduplicationIdExpression = messageDeduplicationIdExpression;
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
@Override
protected void onInit() throws Exception {
super.onInit();
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
if (this.messageConverter == null) {
this.messageConverter = new GenericMessageConverter(getConversionService());
}
}
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class);
if (!StringUtils.hasText(queue) && this.queueExpression != null) {
queue = this.queueExpression.getValue(this.evaluationContext, message, String.class);
@SuppressWarnings("unchecked")
protected Future<?> handleMessageToAws(Message<?> message) {
Object payload = message.getPayload();
if (payload instanceof SendMessageBatchRequest) {
AsyncHandler<SendMessageBatchRequest, SendMessageBatchResult> asyncHandler =
obtainAsyncHandler(message, (SendMessageBatchRequest) payload);
return this.amazonSqs.sendMessageBatchAsync((SendMessageBatchRequest) payload, asyncHandler);
}
SendMessageRequest sendMessageRequest;
if (payload instanceof SendMessageRequest) {
sendMessageRequest = (SendMessageRequest) payload;
}
else {
String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class);
if (!StringUtils.hasText(queue) && this.queueExpression != null) {
queue = this.queueExpression.getValue(getEvaluationContext(), message, String.class);
}
Assert.state(queue != null, "'queue' must not be null for sending an SQS message. " +
"Consider configuring this handler with a 'queue'( or 'queueExpression') or supply an " +
"'aws_queue' message header");
String queueUrl = (String) this.destinationResolver.resolveDestination(queue);
String messageBody = (String) this.messageConverter.fromMessage(message, String.class);
sendMessageRequest = new SendMessageRequest(queueUrl, messageBody);
if (this.delayExpression != null) {
Integer delay = this.delayExpression.getValue(getEvaluationContext(), message, Integer.class);
sendMessageRequest.setDelaySeconds(delay);
}
if (this.messageGroupIdExpression != null) {
String messageGroupId =
this.messageGroupIdExpression.getValue(getEvaluationContext(), message, String.class);
sendMessageRequest.setMessageGroupId(messageGroupId);
}
if (this.messageDeduplicationIdExpression != null) {
String messageDeduplicationId =
this.messageDeduplicationIdExpression.getValue(getEvaluationContext(), message, String.class);
sendMessageRequest.setMessageDeduplicationId(messageDeduplicationId);
}
}
AsyncHandler<SendMessageRequest, SendMessageResult> asyncHandler =
obtainAsyncHandler(message, sendMessageRequest);
return this.amazonSqs.sendMessageAsync(sendMessageRequest, asyncHandler);
}
@Override
protected void additionalOnSuccessHeaders(AbstractIntegrationMessageBuilder<?> messageBuilder,
AmazonWebServiceRequest request, Object result) {
if (result instanceof SendMessageResult) {
SendMessageResult sendMessageResult = (SendMessageResult) result;
messageBuilder.setHeaderIfAbsent(AwsHeaders.MESSAGE_ID, sendMessageResult.getMessageId());
messageBuilder.setHeaderIfAbsent(AwsHeaders.SEQUENCE_NUMBER, sendMessageResult.getSequenceNumber());
}
Assert.state(queue != null, "'queue' must not be null for sending an SQS message. " +
"Consider configuring this handler with a 'queue'( or 'queueExpression') or supply an " +
"'aws_queue' message header");
this.template.send(queue, message);
}
}

View File

@@ -65,11 +65,6 @@ public abstract class AwsHeaders {
*/
public static final String SNS_MESSAGE_TYPE = PREFIX + "snsMessageType";
/**
* The {@value SNS_PUBLISHED_MESSAGE_ID} header for message published over SNS.
*/
public static final String SNS_PUBLISHED_MESSAGE_ID = PREFIX + "snsPublishedMessageId";
/**
* The {@value SHARD} header to represent Kinesis shardId.
*/
@@ -101,7 +96,7 @@ public abstract class AwsHeaders {
public static final String PARTITION_KEY = PREFIX + "partitionKey";
/**
* The {@value SEQUENCE_NUMBER} header for sending data to Kinesis.
* The {@value SEQUENCE_NUMBER} header for sending data to SQS/Kinesis.
*/
public static final String SEQUENCE_NUMBER = PREFIX + "sequenceNumber";

View File

@@ -248,9 +248,9 @@
<xsd:simpleType name="s3CommandType">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="UPLOAD" />
<xsd:enumeration value="DOWNLOAD" />
<xsd:enumeration value="COPY" />
<xsd:enumeration value="UPLOAD"/>
<xsd:enumeration value="DOWNLOAD"/>
<xsd:enumeration value="COPY"/>
</xsd:restriction>
</xsd:simpleType>
@@ -303,7 +303,7 @@
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.file.filters.FileListFilter" />
type="org.springframework.integration.file.filters.FileListFilter"/>
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
@@ -357,22 +357,22 @@
</xsd:annotation>
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="base-s3-inbound-adapter-type" />
<xsd:extension base="base-s3-inbound-adapter-type"/>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="base-s3-inbound-adapter-type">
<xsd:sequence>
<xsd:element ref="integration:poller" minOccurs="0" maxOccurs="1" />
<xsd:element ref="integration:poller" minOccurs="0" maxOccurs="1"/>
</xsd:sequence>
<xsd:attributeGroup ref="integration:channelAdapterAttributes" />
<xsd:attributeGroup ref="integration:channelAdapterAttributes"/>
<xsd:attribute name="session-factory" type="xsd:string" use="required">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.file.remote.session.SessionFactory" />
type="org.springframework.integration.file.remote.session.SessionFactory"/>
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
@@ -383,10 +383,10 @@
</xsd:attribute>
<xsd:attribute name="remote-file-separator" type="xsd:string" default="/">
<xsd:annotation>
<xsd:documentation>
Allows you to provide remote file/directory
separator character. DEFAULT: '/'
</xsd:documentation>
<xsd:documentation>
Allows you to provide remote file/directory
separator character. DEFAULT: '/'
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="remote-directory" type="xsd:string" use="optional">
@@ -428,7 +428,7 @@
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.file.filters.FileListFilter" />
<tool:expected-type type="org.springframework.integration.file.filters.FileListFilter"/>
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
@@ -450,6 +450,92 @@
</xsd:attribute>
</xsd:complexType>
<xsd:attributeGroup name="awsAsyncOutboundAttributes">
<xsd:attribute name="sync">
<xsd:annotation>
<xsd:documentation>
Boolean value to indicate whether the target AWS operation should be performed async (default)
or sync manner.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="send-timeout">
<xsd:annotation>
<xsd:documentation>
The timeout in milliseconds to wait for AWS response in sync mode.
Defaults to 10 seconds.
Mutually exclusive with 'send-timeout-expression'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="send-timeout-expression">
<xsd:annotation>
<xsd:documentation>
A SpEL expression that resolves a timeout in milliseconds at runtime
to wait for AWS response in sync mode.
Mutually exclusive with 'send-timeout'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="error-message-strategy" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.support.ErrorMessageStrategy" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The reference to the 'org.springframework.integration.support.ErrorMessageStrategy' bean.
Defaults to 'DefaultErrorMessageStrategy'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="failure-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.messaging.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The message channel to send error messages in the async mode.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="success-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.messaging.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The message channel to send confirmation messages from the callback in the async mode.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="async-handler" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="com.amazonaws.handlers.AsyncHandler" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
Asynchronous callback handler for events in the lifecycle of the request. Users can provide an
implementation of the callback methods in this interface to receive notification of successful or
unsuccessful completion of the operation.
By default successful reply is sent to the 'success-channel' and error message to the
'failure-channel' if they are provided.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:attributeGroup>
<xsd:element name="sqs-outbound-channel-adapter">
<xsd:complexType>
<xsd:annotation>
@@ -471,6 +557,7 @@
Mutually exclusive with 'queue-expression'.
This attribute isn't mandatory and the queue can be specified in message headers
with the 'AwsHeaders.QUEUE' header name.
Mutually exclusive with 'queue-expression'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
@@ -482,24 +569,86 @@
Mutually exclusive with 'queue'.
This attribute isn't mandatory and the queue can be specified in message headers with
the 'AwsHeaders.QUEUE' header name.
Mutually exclusive with 'queue'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="queue-messaging-template">
<xsd:attribute name="delay">
<xsd:annotation>
<xsd:documentation>
A bean reference to the QueueMessagingTemplate that this class is supposed to use.
This attribute is mutually exclusive with the 'sqs' and 'resource-id-resolver'
attributes.
The length of time, in seconds, for which to delay a specific message.
Valid values: 0 to 900. Maximum: 15 minutes.
Messages with a positive delay value become available for processing after the delay
period is finished.
If not specified, the default value for the queue applies.
Mutually exclusive with 'delay-expression'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="delay-expression">
<xsd:annotation>
<xsd:documentation>
A SpEL expression that resolves to the length of time, in seconds,
for which to delay a specific message.
Mutually exclusive with 'delay'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="message-group-id">
<xsd:annotation>
<xsd:documentation>
The tag that specifies that a message belongs to a specific message group.
Messages that belong to the same message group are processed in a FIFO manner
(however, messages in different message groups might be processed out of order).
To interleave multiple ordered streams within a single queue, use 'MessageGroupId'
values (for example, session data for multiple users).
In this scenario, multiple readers can process the queue, but the session data
of each user is processed in a FIFO fashion.
Mutually exclusive with 'message-group-id-expression'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="message-group-id-expression">
<xsd:annotation>
<xsd:documentation>
A SpEL expression that resolves a 'MessageGroupId' token at runtime.
Mutually exclusive with 'message-group-id'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="message-deduplication-id">
<xsd:annotation>
<xsd:documentation>
The token used for deduplication of sent messages.
If a message with a particular 'MessageDeduplicationId' is sent successfully,
any messages sent with the same 'MessageDeduplicationId' are accepted successfully
but aren't delivered during the 5-minute deduplication interval.
Mutually exclusive with 'message-deduplication-id-expression'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="message-deduplication-id-expression">
<xsd:annotation>
<xsd:documentation>
A SpEL expression that resolves a 'MessageDeduplicationId' token at runtime.
Mutually exclusive with 'message-deduplication-id'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="message-converter">
<xsd:annotation>
<xsd:documentation>
A bean reference to the MessageConverter.
</xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate"/>
type="org.springframework.messaging.converter.MessageConverter"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="awsAsyncOutboundAttributes"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
@@ -725,70 +874,7 @@
<xsd:complexContent>
<xsd:extension base="baseSnsAdapterType">
<xsd:attributeGroup ref="integration:channelAdapterAttributes"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
<xsd:element name="sns-outbound-gateway">
<xsd:complexType>
<xsd:annotation>
<xsd:documentation>
Defines an outbound SNS Channel Adapter for publishing messages to the topic.
</xsd:documentation>
</xsd:annotation>
<xsd:complexContent>
<xsd:extension base="baseSnsAdapterType">
<xsd:attribute name="request-channel" use="required" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.messaging.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
Identifies the request channel attached to this gateway.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="reply-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.messaging.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
Identifies the reply channel attached to this
gateway.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="reply-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Allows you to specify how long this gateway will wait for
the reply message to be sent successfully to the reply channel
before throwing an exception. This attribute only applies when the
channel might block, for example when using a bounded queue channel that
is currently full.
Also, keep in mind that when sending to a DirectChannel, the
invocation will occur in the sender's thread. Therefore,
the failing of the send operation may be caused by other
components further downstream.
The "reply-timeout" attribute maps to the "sendTimeout" property of the
underlying 'MessagingTemplate' instance (org.springframework.integration.core.MessagingTemplate).
The attribute will default, if not specified, to '-1', meaning that
by default, the Gateway will wait indefinitely. The value is
specified in milliseconds.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="id" type="xsd:string"/>
<xsd:attributeGroup ref="integration:smartLifeCycleAttributeGroup"/>
<xsd:attributeGroup ref="awsAsyncOutboundAttributes"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
@@ -801,7 +887,7 @@
Base type for the 'sqs-message-driven-channel-adapter' and 'sqs-outbound-channel-adapter' elements.
</xsd:documentation>
</xsd:annotation>
<xsd:attribute name="sqs">
<xsd:attribute name="sqs" use="required">
<xsd:annotation>
<xsd:documentation>
The 'com.amazonaws.services.sqs.AmazonSQS' bean reference.

View File

@@ -8,34 +8,39 @@
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
<bean id="amazonSns" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="com.amazonaws.services.sns.AmazonSNS"/>
<constructor-arg value="com.amazonaws.services.sns.AmazonSNSAsync"/>
</bean>
<bean id="resourceIdResolver" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.cloud.aws.core.env.ResourceIdResolver" />
</bean>
<bean id="errorMessageStrategy" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.integration.support.ErrorMessageStrategy"/>
</bean>
<bean id="asyncHandler" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="com.amazonaws.handlers.AsyncHandler"/>
</bean>
<int:channel id="successChannel"/>
<int:channel id="notificationChannel"/>
<int-aws:sns-outbound-channel-adapter
id="defaultAdapter"
channel="notificationChannel"
failure-channel="errorChannel"
sns="amazonSns"
success-channel="successChannel"
error-message-strategy="errorMessageStrategy"
async-handler="asyncHandler"
send-timeout="202"
sync="false"
resource-id-resolver="resourceIdResolver">
<int-aws:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice"/>
</int-aws:request-handler-advice-chain>
</int-aws:sns-outbound-channel-adapter>
<int:channel id="notificationChannel"/>
<int-aws:sns-outbound-gateway
id="snsGateway"
sns="amazonSns"
request-channel="notificationChannel"
reply-channel="errorChannel"
topic-arn="foo"
subject="bar"
body-expression="payload.toUpperCase()"
resource-id-resolver="resourceIdResolver"
auto-startup="false"
phase="201"/>
</beans>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,18 +18,14 @@ package org.springframework.integration.aws.config.xml;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.expression.Expression;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@@ -37,7 +33,8 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sns.AmazonSNSAsync;
/**
* @author Artem Bilan
@@ -48,18 +45,14 @@ import com.amazonaws.services.sns.AmazonSNS;
public class SnsOutboundChannelAdapterParserTests {
@Autowired
private AmazonSNS amazonSns;
@Autowired
@Qualifier("defaultAdapter")
private MessageChannel defaultAdapterChannel;
private AmazonSNSAsync amazonSns;
@Autowired
@Qualifier("errorChannel")
private MessageChannel errorChannel;
@Autowired
@Qualifier("defaultAdapter.adapter")
@Qualifier("defaultAdapter")
private AbstractEndpoint defaultAdapter;
@Autowired
@@ -70,29 +63,24 @@ public class SnsOutboundChannelAdapterParserTests {
@Qualifier("notificationChannel")
private MessageChannel notificationChannel;
@Autowired
@Qualifier("snsGateway")
private AbstractEndpoint snsGateway;
@Autowired
@Qualifier("snsGateway.handler")
private MessageHandler snsGatewayHandler;
@Autowired
private ResourceIdResolver resourceIdResolver;
@Autowired
private ErrorMessageStrategy errorMessageStrategy;
@Autowired
private AsyncHandler<?, ?> asyncHandler;
@Autowired
private MessageChannel successChannel;
@Test
public void testSnsOutboundChannelAdapterDefaultParser() throws Exception {
public void testSnsOutboundChannelAdapterDefaultParser() {
Object handler = TestUtils.getPropertyValue(this.defaultAdapter, "handler");
assertThat(AopUtils.isAopProxy(handler)).isFalse();
assertThat(handler).isSameAs(this.defaultAdapterHandler);
assertThat(TestUtils.getPropertyValue(handler, "adviceChain", List.class).get(0))
.isInstanceOf(RequestHandlerRetryAdvice.class);
assertThat(TestUtils.getPropertyValue(this.defaultAdapter, "inputChannel"))
.isSameAs(this.defaultAdapterChannel);
.isSameAs(this.notificationChannel);
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "amazonSns")).isSameAs(this.amazonSns);
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "evaluationContext")).isNotNull();
@@ -101,30 +89,28 @@ public class SnsOutboundChannelAdapterParserTests {
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "bodyExpression")).isNull();
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "resourceIdResolver"))
.isSameAs(this.resourceIdResolver);
}
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "failureChannel"))
.isSameAs(this.errorChannel);
@Test
public void testSnsOutboundGatewayParser() {
assertThat(TestUtils.getPropertyValue(this.snsGateway, "inputChannel")).isSameAs(this.notificationChannel);
assertThat(TestUtils.getPropertyValue(this.snsGateway, "handler")).isSameAs(this.snsGatewayHandler);
assertThat(TestUtils.getPropertyValue(this.snsGateway, "autoStartup", Boolean.class)).isFalse();
assertThat(TestUtils.getPropertyValue(this.snsGateway, "phase", Integer.class)).isEqualTo(201);
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "produceReply", Boolean.class)).isTrue();
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "outputChannel")).isSameAs(this.errorChannel);
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "resourceIdResolver"))
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "resourceIdResolver"))
.isSameAs(this.resourceIdResolver);
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "amazonSns")).isSameAs(this.amazonSns);
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "evaluationContext")).isNotNull();
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "topicArnExpression", Expression.class)
.getExpressionString())
.isEqualTo("foo");
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "subjectExpression", Expression.class)
.getExpressionString())
.isEqualTo("bar");
assertThat(TestUtils.getPropertyValue(this.snsGatewayHandler, "bodyExpression", Expression.class)
.getExpressionString())
.isEqualTo("payload.toUpperCase()");
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "outputChannel"))
.isSameAs(this.successChannel);
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "errorMessageStrategy"))
.isSameAs(this.errorMessageStrategy);
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "asyncHandler"))
.isSameAs(this.asyncHandler);
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler, "sync", Boolean.class))
.isFalse();
assertThat(TestUtils.getPropertyValue(this.defaultAdapterHandler,
"sendTimeoutExpression.literalValue"))
.isEqualTo("202");
}
}

View File

@@ -2,9 +2,11 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-aws="http://www.springframework.org/schema/integration/aws"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/cloud/aws/messaging http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging.xsd">
<aws-messaging:sqs-async-client id="sqs"/>
@@ -13,12 +15,38 @@
<constructor-arg value="org.springframework.cloud.aws.core.env.ResourceIdResolver"/>
</bean>
<bean id="errorMessageStrategy" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.integration.support.ErrorMessageStrategy"/>
</bean>
<bean id="messageConverter" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.messaging.converter.MessageConverter"/>
</bean>
<bean id="asyncHandler" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="com.amazonaws.handlers.AsyncHandler"/>
</bean>
<int:channel id="failureChannel"/>
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="sqs"
auto-startup="false"
channel="errorChannel"
phase="100"
id="sqsOutboundChannelAdapter"
queue="foo"
delay-expression="'200'"
message-deduplication-id="foo"
message-group-id-expression="'bar'"
send-timeout="202"
sync="false"
error-message-strategy="errorMessageStrategy"
failure-channel="failureChannel"
success-channel="successChannel"
message-converter="messageConverter"
async-handler="asyncHandler"
resource-id-resolver="resourceIdResolver"/>
</beans>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,13 +25,16 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.AmazonSQS;
/**
@@ -48,9 +51,24 @@ public class SqsMessageHandlerParserTests {
@Autowired
private ResourceIdResolver resourceIdResolver;
@Autowired
private ErrorMessageStrategy errorMessageStrategy;
@Autowired
private MessageConverter messageConverter;
@Autowired
private AsyncHandler<?, ?> asyncHandler;
@Autowired
private MessageChannel errorChannel;
@Autowired
private MessageChannel failureChannel;
@Autowired
private MessageChannel successChannel;
@Autowired
private EventDrivenConsumer sqsOutboundChannelAdapter;
@@ -60,10 +78,10 @@ public class SqsMessageHandlerParserTests {
@Test
public void testSqsMessageHandlerParser() {
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "template.amazonSqs"))
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "amazonSqs"))
.isSameAs(this.amazonSqs);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler,
"template.destinationResolver.targetDestinationResolver.resourceIdResolver"))
"destinationResolver.resourceIdResolver"))
.isSameAs(this.resourceIdResolver);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler,
"queueExpression.literalValue"))
@@ -75,6 +93,40 @@ public class SqsMessageHandlerParserTests {
.isSameAs(this.errorChannel);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapter, "handler"))
.isSameAs(this.sqsOutboundChannelAdapterHandler);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler,
"delayExpression.expression"))
.isEqualTo("'200'");
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler,
"messageDeduplicationIdExpression.literalValue"))
.isEqualTo("foo");
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler,
"messageGroupIdExpression.expression"))
.isEqualTo("'bar'");
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "failureChannel"))
.isSameAs(this.failureChannel);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "outputChannel"))
.isSameAs(this.successChannel);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "messageConverter"))
.isSameAs(this.messageConverter);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "errorMessageStrategy"))
.isSameAs(this.errorMessageStrategy);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "asyncHandler"))
.isSameAs(this.asyncHandler);
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler, "sync", Boolean.class))
.isFalse();
assertThat(TestUtils.getPropertyValue(this.sqsOutboundChannelAdapterHandler,
"sendTimeoutExpression.literalValue"))
.isEqualTo("202");
}
}

View File

@@ -13,8 +13,6 @@
<constructor-arg value="org.springframework.cloud.aws.core.env.ResourceIdResolver"/>
</bean>
<aws-messaging:queue-messaging-template amazon-sqs="sqs" id="queueMessagingTemplate"/>
<int-aws:sqs-outbound-channel-adapter sqs="sqs"
auto-startup="false"
channel="errorChannel"
@@ -23,11 +21,4 @@
queue="foo"
resource-id-resolver="resourceIdResolver"/>
<int-aws:sqs-outbound-channel-adapter auto-startup="false"
channel="errorChannel"
phase="100"
id="sqsOutboundChannelAdapterWithQueueMessagingTemplate"
queue="foo"
queue-messaging-template="queueMessagingTemplate"/>
</beans>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,15 +16,10 @@
package org.springframework.integration.aws.config.xml;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.aws.outbound.SqsMessageHandler;
import org.springframework.integration.test.util.TestUtils;
/**
* @author Rahul Pilani
@@ -52,21 +47,4 @@ public class SqsOutboundChannelAdapterParserTests {
new ClassPathXmlApplicationContext("SqsOutboundChannelAdapterParserTests-context-bad4.xml", getClass()).close();
}
@Test
public void test_happy_path_with_queue_messaging_template() {
ConfigurableApplicationContext applicationContext =
new ClassPathXmlApplicationContext("SqsOutboundChannelAdapterParserTests-context-good.xml", getClass());
SqsMessageHandler handlerWithTemplate =
applicationContext.getBean("sqsOutboundChannelAdapterWithQueueMessagingTemplate.handler",
SqsMessageHandler.class);
assertThat(TestUtils.getPropertyValue(handlerWithTemplate, "template")).isNotNull();
SqsMessageHandler handlerWithSqs = applicationContext.getBean("sqsOutboundChannelAdapterWithSqs.handler",
SqsMessageHandler.class);
assertThat(TestUtils.getPropertyValue(handlerWithSqs, "template")).isNotNull();
applicationContext.close();
}
}

View File

@@ -1,94 +0,0 @@
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.aws.outbound;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.SendMessageRequest;
/**
* Parent class to contain tests that exercise the SqsMessageHandler class
*
* Subclasses can instantiate SqsMessageHandler in their own way.
*
* @author Rahul Pilani
* @author Artem Bilan
*/
@DirtiesContext
public abstract class AbstractSqsMessageHandlerTests {
@Autowired
protected AmazonSQS amazonSqs;
@Autowired
protected MessageChannel sqsSendChannel;
@Autowired
protected SqsMessageHandler sqsMessageHandler;
@Test
public void testSqsMessageHandler() {
Message<String> message = MessageBuilder.withPayload("message").build();
try {
this.sqsSendChannel.send(message);
}
catch (Exception e) {
assertThat(e).isInstanceOf(MessageHandlingException.class);
assertThat(e.getCause()).isInstanceOf(IllegalStateException.class);
}
this.sqsMessageHandler.setQueue("foo");
this.sqsSendChannel.send(message);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor =
ArgumentCaptor.forClass(SendMessageRequest.class);
verify(this.amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture());
assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl())
.isEqualTo("http://queue-url.com/foo");
message = MessageBuilder.withPayload("message").setHeader(AwsHeaders.QUEUE, "bar").build();
this.sqsSendChannel.send(message);
verify(this.amazonSqs, times(2)).sendMessage(sendMessageRequestArgumentCaptor.capture());
assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl())
.isEqualTo("http://queue-url.com/bar");
SpelExpressionParser spelExpressionParser = new SpelExpressionParser();
Expression expression = spelExpressionParser.parseExpression("headers.foo");
this.sqsMessageHandler.setQueueExpression(expression);
message = MessageBuilder.withPayload("message").setHeader("foo", "baz").build();
this.sqsSendChannel.send(message);
verify(this.amazonSqs, times(3)).sendMessage(sendMessageRequestArgumentCaptor.capture());
assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl())
.isEqualTo("http://queue-url.com/baz");
}
}

View File

@@ -39,11 +39,14 @@ import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
@@ -61,28 +64,29 @@ public class SnsMessageHandlerTests {
private MessageChannel sendToSnsChannel;
@Autowired
private AmazonSNS amazonSNS;
private AmazonSNSAsync amazonSNS;
@Autowired
private PollableChannel resultChannel;
@Test
@SuppressWarnings("unchecked")
public void testSnsMessageHandler() {
SnsBodyBuilder payload = SnsBodyBuilder.withDefault("foo")
.forProtocols("{\"foo\" : \"bar\"}", "sms");
QueueChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.withPayload(payload)
.setHeader("topic", "topic")
.setHeader("subject", "subject")
.setReplyChannel(replyChannel)
.build();
this.sendToSnsChannel.send(message);
Message<?> reply = replyChannel.receive(1000);
Message<?> reply = this.resultChannel.receive(10000);
assertThat(reply).isNotNull();
ArgumentCaptor<PublishRequest> captor = ArgumentCaptor.forClass(PublishRequest.class);
verify(this.amazonSNS).publish(captor.capture());
verify(this.amazonSNS).publishAsync(captor.capture(), any(AsyncHandler.class));
PublishRequest publishRequest = captor.getValue();
@@ -92,9 +96,9 @@ public class SnsMessageHandlerTests {
assertThat(publishRequest.getMessage())
.isEqualTo("{\"default\":\"foo\",\"sms\":\"{\\\"foo\\\" : \\\"bar\\\"}\"}");
assertThat(reply.getHeaders().get(AwsHeaders.SNS_PUBLISHED_MESSAGE_ID)).isEqualTo("111");
assertThat(reply.getHeaders().get(AwsHeaders.MESSAGE_ID)).isEqualTo("111");
assertThat(reply.getHeaders().get(AwsHeaders.TOPIC)).isEqualTo("topic");
assertThat(reply.getPayload()).isSameAs(publishRequest);
assertThat(reply.getPayload()).isSameAs(payload);
}
@Configuration
@@ -102,23 +106,35 @@ public class SnsMessageHandlerTests {
public static class ContextConfiguration {
@Bean
public AmazonSNS amazonSNS() {
AmazonSNS mock = mock(AmazonSNS.class);
@SuppressWarnings("unchecked")
public AmazonSNSAsync amazonSNS() {
AmazonSNSAsync mock = mock(AmazonSNSAsync.class);
willAnswer(invocation -> new PublishResult().withMessageId("111"))
willAnswer(invocation -> {
PublishResult publishResult = new PublishResult().withMessageId("111");
AsyncHandler<PublishRequest, PublishResult> asyncHandler = invocation.getArgument(1);
asyncHandler.onSuccess(invocation.getArgument(0), publishResult);
return new AsyncResult<>(publishResult);
})
.given(mock)
.publish(any(PublishRequest.class));
.publishAsync(any(PublishRequest.class), any(AsyncHandler.class));
return mock;
}
@Bean
public PollableChannel resultChannel() {
return new QueueChannel();
}
@Bean
@ServiceActivator(inputChannel = "sendToSnsChannel")
public MessageHandler snsMessageHandler() {
SnsMessageHandler snsMessageHandler = new SnsMessageHandler(amazonSNS(), true);
SnsMessageHandler snsMessageHandler = new SnsMessageHandler(amazonSNS());
snsMessageHandler.setTopicArnExpression(PARSER.parseExpression("headers.topic"));
snsMessageHandler.setSubjectExpression(PARSER.parseExpression("headers.subject"));
snsMessageHandler.setBodyExpression(PARSER.parseExpression("payload"));
snsMessageHandler.setOutputChannel(resultChannel());
return snsMessageHandler;
}

View File

@@ -16,23 +16,38 @@
package org.springframework.integration.aws.outbound;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
/**
* Instantiating SqsMessageHandler using amazonSqs.
@@ -42,8 +57,57 @@ import com.amazonaws.services.sqs.model.GetQueueUrlResult;
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class SqsMessageHandlerTests extends AbstractSqsMessageHandlerTests {
public class SqsMessageHandlerTests {
@Autowired
protected AmazonSQSAsync amazonSqs;
@Autowired
protected MessageChannel sqsSendChannel;
@Autowired
protected SqsMessageHandler sqsMessageHandler;
@Test
@SuppressWarnings("unchecked")
public void testSqsMessageHandler() {
Message<String> message = MessageBuilder.withPayload("message").build();
try {
this.sqsSendChannel.send(message);
}
catch (Exception e) {
assertThat(e).isInstanceOf(MessageHandlingException.class);
assertThat(e.getCause()).isInstanceOf(IllegalStateException.class);
}
this.sqsMessageHandler.setQueue("foo");
this.sqsSendChannel.send(message);
ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor =
ArgumentCaptor.forClass(SendMessageRequest.class);
verify(this.amazonSqs)
.sendMessageAsync(sendMessageRequestArgumentCaptor.capture(), any(AsyncHandler.class));
assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl())
.isEqualTo("http://queue-url.com/foo");
message = MessageBuilder.withPayload("message").setHeader(AwsHeaders.QUEUE, "bar").build();
this.sqsSendChannel.send(message);
verify(this.amazonSqs, times(2))
.sendMessageAsync(sendMessageRequestArgumentCaptor.capture(), any(AsyncHandler.class));
assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl())
.isEqualTo("http://queue-url.com/bar");
SpelExpressionParser spelExpressionParser = new SpelExpressionParser();
Expression expression = spelExpressionParser.parseExpression("headers.foo");
this.sqsMessageHandler.setQueueExpression(expression);
message = MessageBuilder.withPayload("message").setHeader("foo", "baz").build();
this.sqsSendChannel.send(message);
verify(this.amazonSqs, times(3))
.sendMessageAsync(sendMessageRequestArgumentCaptor.capture(), any(AsyncHandler.class));
assertThat(sendMessageRequestArgumentCaptor.getValue().getQueueUrl())
.isEqualTo("http://queue-url.com/baz");
}
@Configuration
@EnableIntegration

View File

@@ -1,81 +0,0 @@
/*
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.aws.outbound;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import org.junit.runner.RunWith;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageHandler;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
/**
* Instantiating SqsMessageHandler using QueueMessagingTemplate.
*
* @author Rahul Pilani
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class SqsMessageHandlerWithQueueMessagingTemplateTests extends AbstractSqsMessageHandlerTests {
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public AmazonSQSAsync amazonSqs() {
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
willAnswer(invocation -> {
GetQueueUrlRequest getQueueUrlRequest = (GetQueueUrlRequest) invocation.getArguments()[0];
GetQueueUrlResult queueUrl = new GetQueueUrlResult();
queueUrl.setQueueUrl("http://queue-url.com/" + getQueueUrlRequest.getQueueName());
return queueUrl;
})
.given(amazonSqs)
.getQueueUrl(any(GetQueueUrlRequest.class));
return amazonSqs;
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate() {
return new QueueMessagingTemplate(amazonSqs());
}
@Bean
@ServiceActivator(inputChannel = "sqsSendChannel")
public MessageHandler sqsMessageHandler() {
return new SqsMessageHandler(queueMessagingTemplate());
}
}
}