Barrier Error Handling

This commit is contained in:
Gary Russell
2015-11-28 11:24:33 -05:00
parent 7ec8687efc
commit 3c855f8204
5 changed files with 227 additions and 4 deletions

View File

@@ -1,8 +1,9 @@
Barrier Sample
==============
This example demonstrates the use of a process barrier component to suspend a thread until some asynchronous operation
completes. It uses an **HTTP Inbound Gateway**, splits the request, sends the splits to rabbitmq and then waits for
This example demonstrates the use of a process barrier component to suspend a thread until some asynchronous operation
completes. The first example uses an **HTTP Inbound Gateway**, splits the request, sends the splits to rabbitmq and then
waits for
the publisher confirms. Finally, the results are returned to the caller.
The sample is a Spring Boot application that loads 2 contexts:
@@ -35,10 +36,38 @@ This will package the application and run it using the [Gradle Application Plugi
In STS (Eclipse), go to package **org.springframework.integration.samples.barrier**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application).
### Output
The gateway (**client**) initiates a simple request posting "A,B,C" to the **server** and the **server** responds with the results.
You should see the following output from the server:
++++++++++++ Replied with: Result: A: ack=true, B: ack=true, C: ack=true ++++++++++++
The second example uses a simple gateway to launch some asynchronous tasks and waits for those tasks to complete.
It shows how you might return an exception to the caller if one or more of those tasks fail.
An aggregator is used to aggregate the results; if there are no errors, the results are returned; if one or more
errors occurred, an exception is sent to release the barrier; this is thrown to the caller and has all the consolidated
results in a property.
You can run this example from an IDE, such as STS using the technique above; in this case, the class is
**ErrorHandlingApplication** in the **org.springframework.integration.samples.barrier** package.
It sends a list of integers to the flow:
[2, 0, 2, 0, 2]
The zeros should fail and in stderr you should see the results:
ConsolidatedResultsException
[results=
[5
org.springframework.integration.transformer.MessageTransformationException:
Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException:
Expression evaluation failed: 10 / payload; nested exception is java.lang.ArithmeticException: / by zero
5
org.springframework.integration.transformer.MessageTransformationException:
Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException:
Expression evaluation failed: 10 / payload; nested exception is java.lang.ArithmeticException: / by zero
5]
]

View File

@@ -0,0 +1,58 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.barrier2;
import java.util.Collection;
import org.springframework.messaging.MessagingException;
/**
* @author Gary Russell
* @since 4.2
*
*/
public class Aggregator {
public Object aggregate(Collection<Object> results) {
for (Object o : results) {
if (o instanceof MessagingException) {
return new ConsolidatedResultsException(results);
}
}
return results;
}
@SuppressWarnings("serial")
public static class ConsolidatedResultsException extends RuntimeException {
private final Collection<Object> results;
public ConsolidatedResultsException(Collection<Object> results) {
this.results = results;
}
public Collection<Object> getResults() {
return results;
}
@Override
public String toString() {
return "ConsolidatedResultsException\n[results=\n" + results.toString().replaceAll(", ", "\n") + "\n]";
}
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.barrier2;
import java.util.Arrays;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ImportResource;
/**
* @author Gary Russell
* @since 4.2
*/
@SpringBootApplication
@ImportResource("/META-INF/spring/integration/errorhandling-context.xml")
public class ErrorHandlingApplication {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext test = new SpringApplicationBuilder(ErrorHandlingApplication.class)
.web(false)
.run(args);
Gateway gateway = test.getBean(Gateway.class);
try {
gateway.process(Arrays.asList(2, 0, 2, 0, 2), "foo");
}
catch (Exception e) {
System.err.println(e.toString());
}
test.close();
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.barrier2;
import java.util.Collection;
/**
* @author Gary Russell
* @since 4.2
*
*/
public interface Gateway {
public void process(Collection<Integer> numbers, String correlationId);
}

View File

@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.2.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
<int:gateway default-request-channel="processChannel"
service-interface="org.springframework.integration.samples.barrier2.Gateway"
default-payload-expression="#args[0]">
<int:default-header name="barrierCorrelation" expression="#args[1]" />
</int:gateway>
<int:publish-subscribe-channel id="processChannel" />
<int:splitter input-channel="processChannel" output-channel="process" order="1" />
<int:channel id="process">
<int:queue />
</int:channel>
<int:chain input-channel="process" output-channel="aggregatorChannel">
<int:transformer expression="10 / payload" /> <!-- exception on 0 payloads -->
<int:poller fixed-delay="1000" error-channel="errors" task-executor="exec" />
</int:chain>
<task:executor id="exec"/>
<!-- Suspend the calling thread until the aggregation is complete -->
<int:barrier id="barrier" input-channel="processChannel" order="2"
correlation-strategy-expression="headers['barrierCorrelation']"
timeout="10000" />
<!-- Aggregate the results and send the result to the barrier release channel -->
<int:aggregator input-channel="aggregatorChannel" output-channel="release">
<bean class="org.springframework.integration.samples.barrier2.Aggregator" />
</int:aggregator>
<int:channel id="release" />
<int:outbound-channel-adapter channel="release" ref="barrier.handler" method="trigger" />
<!-- Error flow; send exceptions to the aggregator; restore the correlation data from the failedMessage -->
<int:chain input-channel="errors" output-channel="aggregatorChannel">
<int:header-enricher>
<int:correlation-id expression="payload.failedMessage.headers.correlationId" />
<int:header name="sequenceSize" expression="payload.failedMessage.headers.sequenceSize" />
<int:header name="sequenceNumber" expression="payload.failedMessage.headers.sequenceNumber" />
</int:header-enricher>
</int:chain>
</beans>