INTSAMPLES-88 Message Handler Advice Samples

Initial Commit

Stateless Retry

INTSAMPLES-88 Add Stateful Retry (AMQP)

INTSAMPLES-88 Add Circuit Breaker Demo

INTSAMPLES-88 Add Expression Evaluating Advice

See README.md

Fix EE

Polishing
This commit is contained in:
Gary Russell
2012-09-13 15:28:19 -04:00
parent 022458b3ca
commit 542f3569e4
14 changed files with 878 additions and 0 deletions

View File

@@ -61,6 +61,7 @@ This category targets developers who are already more familiar with the Spring I
* **stored-procedures-derby** Provides an example of the stored procedure Outbound Gateway using *[Apache Derby](http://db.apache.org/derby/)*
* **stored-procedures-oracle** Provides an example of the stored procedure Outbound Gateway using *ORACLE XE*
* **monitoring** The project used in the *[Spring Integration Management and Monitoring Webinar](http://www.springsource.org/node/3598)* Also available on the *[SpringSourceDev YouTube Channel](http://www.youtube.com/SpringSourceDev)*
* **retry-and-more** Provides samples showing the application of MessageHandler Advice Chains to endpoints - retry, circuit breaker, expression evaluating
## Advanced

View File

@@ -0,0 +1,60 @@
Handler Advice Sample "retry-and-more"
======================================
This sample shows how to use the 2.2.0 Handler Advice feature.
##Stateless Retry Advice Demo
This class (`StatelessRetryDemo`) demonstrates stateless retry.
By default, it runs with a simple default retry (3 tries, no backoff, no recovery)
Run with -Dspring.profiles.active=backoff to run with 3 tries, exponential backoff, no recovery
Run with -Dspring.profiles.active=recovery to run with 3 tries, no backoff, error message "recovery"
In each case enter 'fail n' where n is the number of times you want the service to fail.
e.g. 'fail 2' will succeed in each case on the third try, 'fail 3' will fail permanently after the third try.
##Stateful Retry Advice Demo
This class (`StatefulRetryDemo`) demonstrates stateful retry.
It is similar to the default version of the stateless retry but uses AMQP; you will see that the exception are thrown back to the container and the retries are re-delivered by AMQP.
##Circuit Breaker Advice Demo
This class (`CircuitBreakerDemo`) demonstrates the circuit breaker advice.
In this demo, the target service only succeeds in the last quarter of any minute (seconds 45-59). The breaker's threshold is set to 2, with the breaker going half-open once 15 seconds have elapsed since the last failure.
You can observe the function of the advice by entering a number of messages over time, and watch the resulting messages.
##Expression Evaluating Advice Demo
`FileTransferDeleteAfterSuccessDemo`
`FileTransferRenameAfterFailureDemo`
These classes show how to configure expressions to be evaluated after either succes, or failure; the application context is defined in expression-advice-context-xml.
It is a simulation of an application with a file inbound adapter -> ftp outbound adapter.
The advice on the outbound adapter evaluates an expression after the transfer - delete the file on success, rename it on failure.
The results of the expression evaluation are then logged (INFO or ERROR).
No real FTP is involved; mocks are used to simulate the transfer (success or failure).
In both cases simply add a file ending with .txt in ${java.io.tmpdir}/adviceDemo (e.g. touch /tmp/adviceDemo/x,txt) and the results will appear in the console.
##Running the Demos
In each case, run the main method in each of the demonstration classes.

View File

@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.integration.samples</groupId>
<artifactId>retry-and-more</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<name>Samples (Intermediate) - Retry and More</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.framework.version>3.1.2.RELEASE</spring.framework.version>
<spring.integration.version>2.2.0.RC1</spring.integration.version>
<log4j.version>1.2.16</log4j.version>
<junit.version>4.10</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>${spring.integration.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>${spring.integration.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>${spring.integration.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>${spring.integration.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
</dependency>
<!-- test-scoped dependencies -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.5</source>
<target>1.5</target>
<compilerArgument>-Xlint:all</compilerArgument>
<showWarnings>true</showWarnings>
<showDeprecation>false</showDeprecation>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2</version>
<configuration>
<mainClass>${java.main.class}</mainClass>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>repo.springsource.org.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.springsource.org/libs-milestone</url>
</repository>
</repositories>
</project>

View File

@@ -0,0 +1,58 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.advice;
import org.apache.log4j.Logger;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @author Gary Russell
* @since 2.2
*
*/
public class CircuitBreakerDemo {
private static final Logger LOGGER = Logger.getLogger(CircuitBreakerDemo.class);
public static void main(String[] args) {
LOGGER.info("\n========================================================="
+ "\n "
+ "\n Welcome to Spring Integration! "
+ "\n "
+ "\n For more information please visit: "
+ "\n http://www.springsource.org/spring-integration "
+ "\n "
+ "\n=========================================================" );
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/circuit-breaker-advice-context.xml");
context.registerShutdownHook();
LOGGER.info("\n========================================================="
+ "\n "
+ "\n This is the Circuit Breaker Sample - "
+ "\n "
+ "\n Please enter some text and press return a few times. "
+ "\n Service will succeed only in the last quarter "
+ "\n minute. Breaker will open after 2 failures and "
+ "\n will go half-open after 15 seconds. "
+ "\n "
+ "\n=========================================================" );
}
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.advice;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.annotation.Header;
/**
* @author Gary Russell
* @since 2.2
*
*/
public class ConditionalService {
private final Log logger = LogFactory.getLog(this.getClass());
private final Map<String, AtomicInteger> failCount = new HashMap<String, AtomicInteger>();
/**
* If this service receives a payload 'failnnn' where nnn is the number of failures,
* it will fail that many times for a given message id.
* @param payload
* @param id
*/
public void testRetry(String payload, @Header("failingId") String id) {
if (payload.startsWith("fail")) {
int failHowManyTimes = Integer.parseInt(payload.substring(4).trim());
AtomicInteger failures = failCount.get(id);
if (failures == null) {
failures = new AtomicInteger();
failCount.put(id, failures);
}
int currentFailures = failures.incrementAndGet();
if (currentFailures <= failHowManyTimes) {
String message = "Failure " + currentFailures + " of " + failHowManyTimes;
logger.info("Service failure " + message);
throw new RuntimeException(message);
}
}
logger.info("Service success for " + payload);
failCount.remove(id);
}
/**
* Succeeds only if called any time in the fourth quarter of any minute (seconds 45 thru 59)
* @param payload
*/
public void testCircuitBreaker(String payload) {
Calendar calendar = Calendar.getInstance();
if (calendar.get(Calendar.SECOND) < 45) {
logger.info("Service failure");
throw new RuntimeException("Service failed");
}
logger.info("Service success for " + payload);
}
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.advice;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.log4j.Logger;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.file.remote.session.Session;
import org.springframework.integration.file.remote.session.SessionFactory;
/**
* @author Gary Russell
* @since 2.2
*
*/
public class FileTransferDeleteAfterSuccessDemo {
private static final Logger LOGGER = Logger.getLogger(FileTransferDeleteAfterSuccessDemo.class);
public static void main(String[] args) throws Exception {
LOGGER.info("\n========================================================="
+ "\n "
+ "\n Welcome to Spring Integration! "
+ "\n "
+ "\n For more information please visit: "
+ "\n http://www.springsource.org/spring-integration "
+ "\n "
+ "\n=========================================================" );
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/expression-advice-context.xml");
context.registerShutdownHook();
@SuppressWarnings("unchecked")
SessionFactory<FTPFile> sessionFactory = context.getBean(SessionFactory.class);
SourcePollingChannelAdapter fileInbound = context.getBean(SourcePollingChannelAdapter.class);
@SuppressWarnings("unchecked")
Session<FTPFile> session = mock(Session.class);
when(sessionFactory.getSession()).thenReturn(session);
fileInbound.start();
LOGGER.info("\n========================================================="
+ "\n "
+ "\n This is the Expression Advice Sample - "
+ "\n "
+ "\n Press 'Enter' to terminate. "
+ "\n "
+ "\n Place a file in ${java.io.tmpdir}/adviceDemo ending "
+ "\n with .txt "
+ "\n The demo simulates a file transfer followed by the "
+ "\n Advice deleting the file; the result of the deletion "
+ "\n is logged. "
+ "\n "
+ "\n=========================================================" );
System.in.read();
System.exit(0);
}
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.advice;
import static org.mockito.Mockito.when;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.log4j.Logger;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.file.remote.session.SessionFactory;
/**
* @author Gary Russell
* @since 2.2
*
*/
public class FileTransferRenameAfterFailureDemo {
private static final Logger LOGGER = Logger.getLogger(FileTransferRenameAfterFailureDemo.class);
public static void main(String[] args) throws Exception {
LOGGER.info("\n========================================================="
+ "\n "
+ "\n Welcome to Spring Integration! "
+ "\n "
+ "\n For more information please visit: "
+ "\n http://www.springsource.org/spring-integration "
+ "\n "
+ "\n=========================================================" );
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/expression-advice-context.xml");
context.registerShutdownHook();
@SuppressWarnings("unchecked")
SessionFactory<FTPFile> sessionFactory = context.getBean(SessionFactory.class);
SourcePollingChannelAdapter fileInbound = context.getBean(SourcePollingChannelAdapter.class);
when(sessionFactory.getSession()).thenThrow(new RuntimeException("Force Failure"));
fileInbound.start();
LOGGER.info("\n========================================================="
+ "\n "
+ "\n This is the Expression Advice Sample - "
+ "\n "
+ "\n Press 'Enter' to terminate. "
+ "\n "
+ "\n Place a file in ${java.io.tmpdir}/adviceDemo ending "
+ "\n with .txt "
+ "\n The demo simulates a file transfer failure followed "
+ "\n by the Advice renaming the file; the result of the "
+ "\n rename is logged. "
+ "\n "
+ "\n=========================================================" );
System.in.read();
System.exit(0);
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.advice;
import org.apache.log4j.Logger;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @author Gary Russell
* @since 2.2
*
*/
public class StatefulRetryDemo {
private static final Logger LOGGER = Logger.getLogger(StatefulRetryDemo.class);
public static void main(String[] args) {
LOGGER.info("\n========================================================="
+ "\n "
+ "\n Welcome to Spring Integration! "
+ "\n "
+ "\n For more information please visit: "
+ "\n http://www.springsource.org/spring-integration "
+ "\n "
+ "\n=========================================================" );
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/stateful-retry-advice-context.xml");
context.registerShutdownHook();
LOGGER.info("\n========================================================="
+ "\n "
+ "\n This is the Stateful Sample - "
+ "\n "
+ "\n Please enter some text and press return. "
+ "\n 'fail 2' will fail twice, then succeed "
+ "\n 'fail 3' will fail and never succeed "
+ "\n "
+ "\n=========================================================" );
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.advice;
import org.apache.log4j.Logger;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @author Gary Russell
* @since 2.2
*
*/
public class StatelessRetryDemo {
private static final Logger LOGGER = Logger.getLogger(StatelessRetryDemo.class);
public static void main(String[] args) {
LOGGER.info("\n========================================================="
+ "\n "
+ "\n Welcome to Spring Integration! "
+ "\n "
+ "\n For more information please visit: "
+ "\n http://www.springsource.org/spring-integration "
+ "\n "
+ "\n=========================================================" );
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/stateless-retry-advice-context.xml");
context.registerShutdownHook();
LOGGER.info("\n========================================================="
+ "\n "
+ "\n This is the Stateless Sample - "
+ "\n "
+ "\n Please enter some text and press return. "
+ "\n 'fail 2' will fail twice, then succeed "
+ "\n 'fail 3' will fail and never succeed "
+ "\n "
+ "\n=========================================================" );
}
}

View File

@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.2.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<int-stream:stdin-channel-adapter channel="inputChannel">
<int:poller fixed-delay="1000" error-channel="failedChannel" />
</int-stream:stdin-channel-adapter>
<int:channel id="inputChannel"/>
<int:service-activator input-channel="inputChannel"
ref="conditionalService"
method="testCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel" />
<int:chain input-channel="failedChannel">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="conditionalService" class="org.springframework.integration.samples.advice.ConditionalService" />
<bean id="circuitBreakerAdvice" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" /> <!-- close after 2 failures -->
<property name="halfOpenAfter" value="15000" /> <!-- half open after 15 seconds -->
</bean>
</beans>

View File

@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp-2.2.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.2.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.2.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<context:property-placeholder />
<int-file:inbound-channel-adapter
channel="inputChannel"
auto-startup="false"
auto-create-directory="true"
filename-pattern="*.txt"
directory="${java.io.tmpdir}/adviceDemo/">
<int:poller fixed-delay="500" error-channel="nullChannel" />
</int-file:inbound-channel-adapter>
<int:channel id="inputChannel" />
<int-ftp:outbound-channel-adapter
channel="inputChannel"
session-factory="mockSessionFactory"
remote-directory="foo">
<int-ftp:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()" />
<property name="successChannel" ref="afterSuccessDeleteChannel" />
<property name="onFailureExpression" value="payload.renameTo(payload.absolutePath + '.failed.to.send')" />
<property name="failureChannel" ref="afterFailRenameChannel" />
</bean>
</int-ftp:request-handler-advice-chain>
</int-ftp:outbound-channel-adapter>
<bean id="mockSessionFactory" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.integration.file.remote.session.SessionFactory" />
</bean>
<int:transformer input-channel="afterSuccessDeleteChannel" output-channel="goodResultChannel"
expression="'Removal of ' + inputMessage.payload.absolutePath + ' after transfer, result:' +
payload" />
<int:transformer input-channel="afterFailRenameChannel" output-channel="badResultChannel"
expression="'Renaming of ' + payload.failedMessage.payload.absolutePath + ' after failed transfer, result:' +
payload.evaluationResult" />
<int:logging-channel-adapter id="goodResultChannel" level="INFO" />
<int:logging-channel-adapter id="badResultChannel" level="ERROR"/>
</beans>

View File

@@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp-2.2.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- From STDIN To RabbitMQ -->
<int-stream:stdin-channel-adapter id="consoleIn" channel="toRabbit" >
<int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-stream:stdin-channel-adapter>
<int:channel id="toRabbit" />
<int:chain input-channel="toRabbit">
<int:header-enricher>
<int:header name="uniqueId" expression="T(System).nanoTime()"/>
</int:header-enricher>
<int-amqp:outbound-channel-adapter mapped-request-headers="uniqueId"
amqp-template="amqpTemplate" exchange-name="si.test.exchange"
routing-key="si.test.binding"/>
</int:chain>
<!-- From RabbitMQ To inputChannel -->
<int-amqp:inbound-channel-adapter channel="inputChannel"
mapped-request-headers="uniqueId"
queue-names="si.test.queue"
connection-factory="connectionFactory" />
<int:channel id="inputChannel"/>
<int:header-enricher input-channel="inputChannel" output-channel="toServiceChannel">
<int:header name="failingId" expression="headers['uniqueId'].toString()" />
</int:header-enricher>
<int:service-activator input-channel="toServiceChannel"
ref="conditionalService"
method="testRetry">
<int:request-handler-advice-chain>
<ref bean="retryAdvice" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel" />
<int:chain input-channel="failedChannel">
<int:transformer expression="'permanently failed:' + payload.failedMessage.payload" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="conditionalService" class="org.springframework.integration.samples.advice.ConditionalService" />
<!-- Rabbit infrastructure -->
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="si.test.queue" />
<rabbit:direct-exchange name="si.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="si.test.queue" key="si.test.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
<beans profile="default">
<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="recoveryChannel" />
</bean>
</property>
<property name="retryStateGenerator">
<bean class="org.springframework.integration.handler.advice.SpelExpressionRetryStateGenerator">
<constructor-arg value="headers['uniqueId']"/>
</bean>
</property>
</bean>
<int:channel id="recoveryChannel" />
<int:chain input-channel="recoveryChannel">
<int:transformer expression="'permanently failed:' + payload.failedMessage.payload + ' handled by recovery'" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
</beans>
</beans>

View File

@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.2.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<int-stream:stdin-channel-adapter channel="inputChannel">
<int:poller fixed-delay="1000" error-channel="failedChannel" />
</int-stream:stdin-channel-adapter>
<int:channel id="inputChannel"/>
<int:header-enricher input-channel="inputChannel" output-channel="toServiceChannel">
<int:header name="failingId" expression="headers['id'].toString()" />
</int:header-enricher>
<int:service-activator input-channel="toServiceChannel"
ref="conditionalService"
method="testRetry">
<int:request-handler-advice-chain>
<ref bean="retryAdvice" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel" />
<int:chain input-channel="failedChannel">
<int:transformer expression="'permanently failed:' + payload.failedMessage.payload" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="conditionalService" class="org.springframework.integration.samples.advice.ConditionalService" />
<!-- Default retry advice 3 tries, no backoff, no recovery;
final failure will go to poller's error channel -->
<beans profile="default">
<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice" />
</beans>
<!-- Retry advice, template with default 3 tries, exponential backoff, no recovery;
final failure will go to poller's error channel -->
<beans profile="backoff">
<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="retryTemplate">
<bean class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="2000" />
<property name="multiplier" value="2" />
</bean>
</property>
</bean>
</property>
</bean>
</beans>
<!-- Retry advice, template with default 3 tries, no backoff, recovery;
final handled by recovery instead of going to the poller's error channel -->
<beans profile="recovery">
<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="recoveryChannel" />
</bean>
</property>
</bean>
<int:channel id="recoveryChannel" />
<int:chain input-channel="recoveryChannel">
<int:transformer expression="'permanently failed:' + payload.failedMessage.payload + ' handled by recovery'" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
</beans>
</beans>

View File

@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<!-- Appenders -->
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n" />
</layout>
</appender>
<!-- Loggers -->
<logger name="org.springframework">
<level value="warn" />
</logger>
<logger name="org.springframework.integration.endpoint.SourcePollingChannelAdapter">
<level value="warn" />
</logger>
<logger name="org.springframework.integration.dispatcher.BroadcastingDispatcher">
<level value="warn" />
</logger>
<logger name="org.springframework.integration">
<level value="warn" />
</logger>
<logger name="org.springframework.integration.handler.LoggingHandler">
<level value="info" />
</logger>
<logger name="org.springframework.integration.samples">
<level value="info" />
</logger>
<logger name="org.springframework.retry">
<level value="warn" />
</logger>
<!-- Root Logger -->
<root>
<priority value="warn" />
<appender-ref ref="console" />
</root>
</log4j:configuration>