From 0a4d85b9732708f8b16a840a3566eb602b6d2d88 Mon Sep 17 00:00:00 2001 From: David Turanski Date: Sun, 28 Aug 2011 17:47:04 -0400 Subject: [PATCH] Added error channel, modified error handling to be more consistent with other SI channel adapters. Added check to ensure all flow MessageChannels are private" --- .../integration/flow/Flow.java | 52 +-- .../config/FlowMessageHandlerFactoryBean.java | 13 +- .../integration/flow/config/FlowUtils.java | 157 +++++---- .../config/xml/FlowOutboundGatewayParser.java | 1 + .../flow/handler/FlowMessageHandler.java | 89 +++-- src/main/resources/META-INF/spring.schemas | 1 - .../config/spring-integration-flow-2.0.xsd | 197 ----------- .../config/spring-integration-flow-2.1.xsd | 307 ++++++++++-------- .../flow/config/xml/FlowWithErrorTest.java | 46 ++- .../config/xml/TransactionalFlowTest.java | 6 +- .../resources/FlowContextTest-context.xml | 3 +- .../resources/FlowWithErrorTest-context.xml | 9 +- .../flows/subflow5/subflow5-context.xml | 3 +- .../TransactionalFlowTest-context.xml | 3 +- src/test/resources/log4j.xml | 4 +- 15 files changed, 391 insertions(+), 500 deletions(-) delete mode 100644 src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.0.xsd diff --git a/src/main/java/org/springframework/integration/flow/Flow.java b/src/main/java/org/springframework/integration/flow/Flow.java index de61a92..5841158 100644 --- a/src/main/java/org/springframework/integration/flow/Flow.java +++ b/src/main/java/org/springframework/integration/flow/Flow.java @@ -1,8 +1,10 @@ package org.springframework.integration.flow; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.Set; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -121,6 +123,7 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, A Assert.notEmpty(configLocations, "configLocations cannot be empty"); + /* * create a child application context */ @@ -231,54 +234,33 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, A MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources(); propertySources.addLast(propertySource); } - } private void validatePortMapping() { Assert.notEmpty(this.flowConfiguration.getPortConfigurations(), "flow configuration contains no port configurations"); + /* + * Verify that no channels in the flow context are shared in the parent context + */ List errors = new ArrayList(); - for (PortConfiguration portConfiguration : this.flowConfiguration.getPortConfigurations()) { - String inputChannelName = (String) portConfiguration.getInputChannel(); - validateFlowChannelDefinition(inputChannelName, errors, false); - - for (String outputPortName : portConfiguration.getOutputPortNames()) { - String outputChannelName = (String) portConfiguration.getOutputChannel(outputPortName); - validateFlowChannelDefinition(outputChannelName, errors, true); + + List channelNames = Arrays.asList(this.flowContext.getBeanNamesForType(MessageChannel.class)); + + Set referencedMessageChannels = FlowUtils.getReferencedMessageChannels(this.flowContext.getBeanFactory(),5); + + for (String referencedMessageChannel: referencedMessageChannels) { + if (!channelNames.contains(referencedMessageChannel)) { + errors.add("Flow references channel [" + referencedMessageChannel + "] defined in the parent context. " + + "This channel should be explicitly defined in the flow context"); } } + if (errors.size() > 0 ) { - throw new BeanDefinitionValidationException("\n"+StringUtils.arrayToDelimitedString(errors.toArray(),"\n")); } } - - /* - * If flow context does not contain the bean definition then the definition - * comes from the parent context. The flow should should still work with a - * 'global' PublishSubscribeChannel output channel - */ - private void validateFlowChannelDefinition(String channelName, List errors, boolean allowPubSub) { - - MessageChannel channel = this.flowContext.getBean(channelName, MessageChannel.class); - - if (!this.flowContext.containsBeanDefinition(channelName)) { - if (channel instanceof PublishSubscribeChannel && allowPubSub) { - if (logger.isDebugEnabled()) { - logger.warn("Flow '" + this.flowId +"'" + - " is sharing the publish-subscribe channel '" + channelName +"'" + - " with the parent context."); - } - } else { - errors.add("The flow channel '" - + channelName - + "' in flow '" - + this.flowId - + "' conflicts with a bean definition in the parent context. It must be explicitly declared in the flow'"); - } - } - } + private void bridgeMessagingPorts() { /* diff --git a/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java b/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java index edd2a55..373f7e5 100644 --- a/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java +++ b/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java @@ -40,6 +40,8 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF private volatile Flow flow; private volatile String inputPortName; + + private volatile MessageChannel errorChannel; private volatile long timeout; @@ -54,6 +56,8 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF FlowMessageHandler flowMessageHandler = new FlowMessageHandler(flowInputChannel, flow.getFlowOutputChannel(), timeout); + flowMessageHandler.setErrorChannel(this.errorChannel); + return flowMessageHandler; } @@ -81,6 +85,14 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF public void setTimeout(long timeout) { this.timeout = timeout; } + + /** + * + * @param errorChannel + */ + public void setErrorChannel(MessageChannel errorChannel) { + this.errorChannel = errorChannel; + } @Override @@ -97,7 +109,6 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF this.flowConfiguration = this.flow.getFlowConfiguration().getConfigurationForInputPort( this.inputPortName); } - } } diff --git a/src/main/java/org/springframework/integration/flow/config/FlowUtils.java b/src/main/java/org/springframework/integration/flow/config/FlowUtils.java index 3e1cab8..58ba810 100644 --- a/src/main/java/org/springframework/integration/flow/config/FlowUtils.java +++ b/src/main/java/org/springframework/integration/flow/config/FlowUtils.java @@ -17,9 +17,12 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; -import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.integration.MessageChannel; @@ -34,73 +37,107 @@ import org.springframework.util.ResourceUtils; * */ public class FlowUtils { - - private FlowUtils() {} - + + private FlowUtils() { + } + /** - * Create a bridge - * - * @param inputChannel - * @param outputChannel - */ + * Create a bridge + * + * @param inputChannel + * @param outputChannel + */ - public static void bridgeChannels(SubscribableChannel inputChannel, MessageChannel outputChannel) { - BridgeHandler bridgeHandler = new BridgeHandler(); - bridgeHandler.setOutputChannel(outputChannel); - inputChannel.subscribe(bridgeHandler); - } + public static void bridgeChannels(SubscribableChannel inputChannel, MessageChannel outputChannel) { + BridgeHandler bridgeHandler = new BridgeHandler(); + bridgeHandler.setOutputChannel(outputChannel); + inputChannel.subscribe(bridgeHandler); + } - /** - * Register a bean with "flow" prefix - * - * @param beanDefinition - * @param registry - * @return - */ - public static String registerBeanDefinition(BeanDefinition beanDefinition, BeanDefinitionRegistry registry) { - String beanName = BeanDefinitionReaderUtils.generateBeanName(beanDefinition, registry); - beanName = "flow." + beanName; - String strIndex = StringUtils.substringAfter(beanName, "#"); - int index = Integer.valueOf(strIndex); - while (registry.isBeanNameInUse(beanName)) { - index++; - beanName = beanName.replaceAll("#\\d$", "#" + (index)); - } - registry.registerBeanDefinition(beanName, beanDefinition); - return beanName; - } + /** + * Register a bean with "flow" prefix + * + * @param beanDefinition + * @param registry + * @return + */ + public static String registerBeanDefinition(BeanDefinition beanDefinition, BeanDefinitionRegistry registry) { + String beanName = BeanDefinitionReaderUtils.generateBeanName(beanDefinition, registry); + beanName = "flow." + beanName; + String strIndex = org.apache.commons.lang.StringUtils.substringAfter(beanName, "#"); + int index = Integer.valueOf(strIndex); + while (registry.isBeanNameInUse(beanName)) { + index++; + beanName = beanName.replaceAll("#\\d$", "#" + (index)); + } + registry.registerBeanDefinition(beanName, beanDefinition); + return beanName; + } - /** - * Read the flow documentation resource into a String if it exists. - * The location is classpath:META-INF/spring/integration/flows/[flowId]/flow.doc - * @param flowId the flow id - * @return the documentation - */ - public static String getDocumentation(String flowId) { + /** + * Read the flow documentation resource into a String if it exists. The + * location is classpath:META-INF/spring/integration/flows/[flowId]/flow.doc + * @param flowId the flow id + * @return the documentation + */ + public static String getDocumentation(String flowId) { - String path = String.format("classpath:META-INF/spring/integration/flows/%s/flow.doc", flowId); + String path = String.format("classpath:META-INF/spring/integration/flows/%s/flow.doc", flowId); - try { - File file = ResourceUtils.getFile(path); + try { + File file = ResourceUtils.getFile(path); - BufferedReader br = new BufferedReader(new FileReader(file)); + BufferedReader br = new BufferedReader(new FileReader(file)); - String line; - StringBuilder result = new StringBuilder(); - while ((line = br.readLine()) != null) { - result.append(line).append("\n"); - } - - br.close(); - - return result.toString(); + String line; + StringBuilder result = new StringBuilder(); + while ((line = br.readLine()) != null) { + result.append(line).append("\n"); + } - } catch (FileNotFoundException e) { - return "no help available"; - } catch (IOException e) { - e.printStackTrace(); - return "no help available"; - } - } + br.close(); + + return result.toString(); + + } + catch (FileNotFoundException e) { + return "no help available"; + } + catch (IOException e) { + e.printStackTrace(); + return "no help available"; + } + } + + public static Set getReferencedMessageChannels(ConfigurableListableBeanFactory beanFactory, int max) { + String[] beans = beanFactory.getBeanNamesForType(Object.class); + Set messageChannels = new HashSet(); + _getReferencedMessageChannels(beanFactory, beans, messageChannels); + return Collections.unmodifiableSet(messageChannels); + + } + + private static void _getReferencedMessageChannels(ConfigurableListableBeanFactory beanFactory, String[] beans, + Set messageChannels) { + + for (String bean : beans) { + if (!bean.startsWith("(inner bean)") && !bean.equals("nullChannel")) { + Class clazz = null; + if (beanFactory.containsBean(bean)) { + clazz = beanFactory.getType(bean); + } + + if (clazz != null) { + if (MessageChannel.class.isAssignableFrom(clazz)) { + messageChannels.add(bean); + } + String[] dependencies = beanFactory.getDependenciesForBean(bean); + if (dependencies.length > 0) { + _getReferencedMessageChannels(beanFactory, dependencies, messageChannels); + } + } + } + } + } } diff --git a/src/main/java/org/springframework/integration/flow/config/xml/FlowOutboundGatewayParser.java b/src/main/java/org/springframework/integration/flow/config/xml/FlowOutboundGatewayParser.java index 22c78ec..cf4c430 100644 --- a/src/main/java/org/springframework/integration/flow/config/xml/FlowOutboundGatewayParser.java +++ b/src/main/java/org/springframework/integration/flow/config/xml/FlowOutboundGatewayParser.java @@ -39,6 +39,7 @@ public class FlowOutboundGatewayParser extends AbstractConsumerEndpointParser { IntegrationNamespaceUtils.setValueIfAttributeDefined(flowHandlerBuilder, element, "input-port","inputPortName"); IntegrationNamespaceUtils.setValueIfAttributeDefined(flowHandlerBuilder, element, "timeout"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(flowHandlerBuilder, element, "error-channel"); return flowHandlerBuilder; } diff --git a/src/main/java/org/springframework/integration/flow/handler/FlowMessageHandler.java b/src/main/java/org/springframework/integration/flow/handler/FlowMessageHandler.java index b570343..dc1f90a 100644 --- a/src/main/java/org/springframework/integration/flow/handler/FlowMessageHandler.java +++ b/src/main/java/org/springframework/integration/flow/handler/FlowMessageHandler.java @@ -34,71 +34,85 @@ import org.springframework.integration.support.MessageBuilder; /** * A MessageHandler for Handling Flow input and output. Sends messages on its - * input channel to the flow input channel and replies with the flow output (if there - * is one) to its output channel. + * input channel to the flow input channel and replies with the flow output (if + * there is one) to its output channel. + * + * Internally creates a subscriber to a PublishSubscribeChannel automatically + * created for the flow. Since all FlowMessageHandler instances subscribe to + * this channel, a unique flow conversationId is used to correlate flow input + * and output messages + * + * The output message contains a FLOW_OUTPUT_PORT_HEADER identifying which flow + * output port produced the message + * @see FlowUtils + * + * Error handling is done in a standard way. If the flow includes an error + * channel which is bound to an output port, the handler will send the + * ErrorMessage to the outputChannel. If the flow throws an exception, the + * handler will create an ErrorMessage and send it to the outputChannel * - * Internally creates a subscriber to a PublishSubscribeChannel automatically created for - * the flow. Since all FlowMessageHandler instances subscribe to this channel, a unique - * flow conversationId is used to correlate flow input and output messages - * - * The output message contains a FLOW_OUTPUT_PORT_HEADER identifying which flow output port produced the message - * @see FlowUtils - * - * Error handling is done in a standard way. If the flow includes an error channel which is bound to an output port, the - * handler will send the ErrorMessage to the outputChannel. If the flow throws an exception, the handler will create an - * ErrorMessage and send it to the outputChannel - * * @author David Turanski * */ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler { - private static Log log = LogFactory.getLog(FlowMessageHandler.class); private final MessageChannel flowInputChannel; - + private final SubscribableChannel flowOutputChannel; + private volatile MessageChannel errorChannel; + private final long timeout; + /** * * @param flowInputChannel the Flow input channel - * @param flowOutputChannel a PublishSubscribeChannel internally created and bridged to all flow output channels + * @param flowOutputChannel a PublishSubscribeChannel internally created and + * bridged to all flow output channels * @param timeout the send timeout duration */ public FlowMessageHandler(MessageChannel flowInputChannel, SubscribableChannel flowOutputChannel, long timeout) { this.flowInputChannel = flowInputChannel; - this.flowOutputChannel = flowOutputChannel; + this.flowOutputChannel = flowOutputChannel; this.timeout = timeout; } - + + public void setErrorChannel(MessageChannel errorChannel) { + this.errorChannel = errorChannel; + } @Override protected Object handleRequestMessage(Message requestMessage) { UUID conversationId = requestMessage.getHeaders().getId(); - Map flowConversationIdHeader = Collections.singletonMap(FlowConstants.FLOW_CONVERSATION_ID_HEADER, - (Object) conversationId); + Map flowConversationIdHeader = Collections.singletonMap( + FlowConstants.FLOW_CONVERSATION_ID_HEADER, (Object) conversationId); Message message = MessageBuilder.fromMessage(requestMessage).copyHeaders(flowConversationIdHeader) .build(); - try { - + ResponseMessageHandler responseMessageHandler = new ResponseMessageHandler(conversationId); flowOutputChannel.subscribe(responseMessageHandler); - flowInputChannel.send(message,timeout); + flowInputChannel.send(message, timeout); return responseMessageHandler.getResponse(); - + } catch (MessagingException me) { log.error(me.getMessage(), me); - if (conversationId.equals(me.getFailedMessage().getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) { - return new ErrorMessage(me,Collections.singletonMap(FlowConstants.FLOW_OUTPUT_PORT_HEADER, - (Object)FlowConstants.FLOW_HANDLER_EXCEPTION_HEADER_VALUE)); + if (conversationId + .equals(me.getFailedMessage().getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) { + if (errorChannel != null) { + errorChannel.send(new ErrorMessage(me, Collections.singletonMap( + FlowConstants.FLOW_OUTPUT_PORT_HEADER, + (Object) FlowConstants.FLOW_HANDLER_EXCEPTION_HEADER_VALUE))); + } else { + throw me; + } } } return null; @@ -109,7 +123,9 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler { */ private static class ResponseMessageHandler implements MessageHandler { private final UUID conversationId; + private volatile Message response; + public ResponseMessageHandler(UUID conversationId) { this.conversationId = conversationId; } @@ -123,19 +139,24 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler { */ @Override public void handleMessage(Message message) throws MessagingException { - + if (conversationId.equals(message.getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) { this.response = message; - } else { - if (message instanceof ErrorMessage){ + } + else { + /* + * Response from flow's ErrorChannel which is mapped to an output port. + */ + if (message instanceof ErrorMessage) { MessagingException me = (MessagingException) message.getPayload(); - if (conversationId.equals(me.getFailedMessage().getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) { - this.response = message; + if (conversationId.equals(me.getFailedMessage().getHeaders() + .get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) { + this.response = message; } } - } + } } - + public Message getResponse() { return this.response; } diff --git a/src/main/resources/META-INF/spring.schemas b/src/main/resources/META-INF/spring.schemas index a348fc1..343df27 100644 --- a/src/main/resources/META-INF/spring.schemas +++ b/src/main/resources/META-INF/spring.schemas @@ -1,3 +1,2 @@ -http\://www.springframework.org/schema/integration/flow/spring-integration-flow-2.0.xsd=org/springframework/integration/flow/config/spring-integration-flow-2.0.xsd http\://www.springframework.org/schema/integration/flow/spring-integration-flow-2.1.xsd=org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd http\://www.springframework.org/schema/integration/flow/spring-integration-flow.xsd=org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd \ No newline at end of file diff --git a/src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.0.xsd b/src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.0.xsd deleted file mode 100644 index 0316d8c..0000000 --- a/src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.0.xsd +++ /dev/null @@ -1,197 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - The receiving Message channel of this endpoint - - - - - - - - - - - - - The receiving Message channel of this endpoint - - - - - - - - - - - - - - \ No newline at end of file diff --git a/src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd b/src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd index 1f7acfc..481f40e 100644 --- a/src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd +++ b/src/main/resources/org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd @@ -1,177 +1,198 @@ + xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:bean="http://www.springframework.org/schema/beans" + xmlns:int="http://www.springframework.org/schema/integration" + xmlns:tool="http://www.springframework.org/schema/tool" + targetNamespace="http://www.springframework.org/schema/integration/flow" + elementFormDefault="qualified" attributeFormDefault="unqualified"> - - - + + + - - - + + - - - - - - - - + + + + + + + - - - - - + + + + - - - - - + + + + - - - - - + + + + - - - - - + + + + - - - - + + + + - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Specifies the order for invocation when this endpoint is connected as a + subscriber to a channel. This is particularly relevant when that + channel + is using a "failover" dispatching strategy. It has no effect when + this + endpoint itself is a Polling Consumer for a channel with a queue. + + + + + + + - - - - - - + + + + + + - - - - - + + + + + - - - + + - - - - - + + + + + - - + + - - - - - - - - - - - - - - - The receiving Message channel of this endpoint + + + + + + + + + + + + + + + The receiving Message channel of this endpoint - - - - - - - - - - - - The receiving Message channel of this endpoint - - - - + + - - - + + + + + + + + The receiving Message channel of this endpoint + + + + + + + + - - - - + + + + \ No newline at end of file diff --git a/src/test/java/org/springframework/integration/flow/config/xml/FlowWithErrorTest.java b/src/test/java/org/springframework/integration/flow/config/xml/FlowWithErrorTest.java index b5e66f4..0affcbf 100644 --- a/src/test/java/org/springframework/integration/flow/config/xml/FlowWithErrorTest.java +++ b/src/test/java/org/springframework/integration/flow/config/xml/FlowWithErrorTest.java @@ -28,7 +28,6 @@ import org.springframework.integration.core.MessageHandler; import org.springframework.integration.core.PollableChannel; import org.springframework.integration.core.SubscribableChannel; import org.springframework.integration.message.GenericMessage; - /** * @@ -39,25 +38,25 @@ import org.springframework.integration.message.GenericMessage; public class FlowWithErrorTest { @Test - public void testExceptionThrown(){ + public void testFlowThrowsExceptionWithGatewayErrorChannel(){ ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/FlowWithErrorTest-context.xml"); - MessageChannel input = applicationContext.getBean("inputC",MessageChannel.class); - PollableChannel output = applicationContext.getBean("outputC",PollableChannel.class); - Message msg = new GenericMessage("hello"); - input.send(msg); - - Message reply = output.receive(100); - assertNotNull(reply); - assertTrue(reply.getPayload() instanceof MessagingException); + MessageChannel inputChannel = applicationContext.getBean("inputC",MessageChannel.class); + SubscribableChannel errorChannel = applicationContext.getBean("errorChannel",SubscribableChannel.class); + Message msg = new GenericMessage("hello"); + Handler handler = new Handler(); + errorChannel.subscribe(handler); + inputChannel.send(msg); + assertTrue(handler.gotResponse); + } @Test public void testDirectCallWithErrorChannel(){ ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/META-INF/spring/integration/flows/subflow5/subflow5-context.xml"); - MessageChannel input = applicationContext.getBean("subflow-input",MessageChannel.class); - SubscribableChannel error = applicationContext.getBean("errorChannel",SubscribableChannel.class); + MessageChannel inputChannel = applicationContext.getBean("subflow-input",MessageChannel.class); + SubscribableChannel errorChannel = applicationContext.getBean("errorChannel",SubscribableChannel.class); - error.subscribe(new MessageHandler() { + errorChannel.subscribe(new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { @@ -66,23 +65,38 @@ public class FlowWithErrorTest { }); Message msg = new GenericMessage("hello"); - assertTrue(input.send(msg)); + assertTrue(inputChannel.send(msg)); } @Test public void testWithErrorChannel(){ ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/FlowWithErrorTest-context.xml"); - MessageChannel input = applicationContext.getBean("inputC1",MessageChannel.class); + MessageChannel inputChannel = applicationContext.getBean("inputC1",MessageChannel.class); PollableChannel output = applicationContext.getBean("outputC1",PollableChannel.class); Message msg = new GenericMessage("hello"); - input.send(msg); + inputChannel.send(msg); Message reply = output.receive(100); assertNotNull(reply); assertTrue(reply.getPayload() instanceof MessagingException); } + private static class Handler implements MessageHandler { + public boolean gotResponse; + @SuppressWarnings("unused") + public Message message; + /* (non-Javadoc) + * @see org.springframework.integration.core.MessageHandler#handleMessage(org.springframework.integration.Message) + */ + @Override + public void handleMessage(Message message) throws MessagingException { + this.gotResponse = true; + this.message = message; + } + + } + } diff --git a/src/test/java/org/springframework/integration/flow/config/xml/TransactionalFlowTest.java b/src/test/java/org/springframework/integration/flow/config/xml/TransactionalFlowTest.java index 63bf987..30269ca 100644 --- a/src/test/java/org/springframework/integration/flow/config/xml/TransactionalFlowTest.java +++ b/src/test/java/org/springframework/integration/flow/config/xml/TransactionalFlowTest.java @@ -87,14 +87,14 @@ public class TransactionalFlowTest { } @Test - public void testFlowRollback() { + public void testFlowRollbackWithGatewayErrorChannel() { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("/TransactionalFlowTest-context.xml","/txmanager-config.xml"); MessageChannel inputChannel = applicationContext.getBean("inputC", MessageChannel.class); - SubscribableChannel outputChannel = applicationContext.getBean("outputC", SubscribableChannel.class); + SubscribableChannel errorChannel = applicationContext.getBean("errorChannel", SubscribableChannel.class); StubTransactionManager transactionManager = applicationContext.getBean(StubTransactionManager.class); Handler handler = new Handler(); - outputChannel.subscribe(handler); + errorChannel.subscribe(handler); inputChannel.send(new GenericMessage("rollback")); diff --git a/src/test/resources/FlowContextTest-context.xml b/src/test/resources/FlowContextTest-context.xml index 1ec4d02..9b24b2d 100644 --- a/src/test/resources/FlowContextTest-context.xml +++ b/src/test/resources/FlowContextTest-context.xml @@ -13,7 +13,8 @@ + output-channel="outputC" + /> diff --git a/src/test/resources/FlowWithErrorTest-context.xml b/src/test/resources/FlowWithErrorTest-context.xml index 8d5a40f..5d2f0f0 100644 --- a/src/test/resources/FlowWithErrorTest-context.xml +++ b/src/test/resources/FlowWithErrorTest-context.xml @@ -9,17 +9,16 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd"> - + - - - + output-channel="outputC" + error-channel="errorChannel"/> + - + + diff --git a/src/test/resources/TransactionalFlowTest-context.xml b/src/test/resources/TransactionalFlowTest-context.xml index 7f4c883..57ba3d3 100644 --- a/src/test/resources/TransactionalFlowTest-context.xml +++ b/src/test/resources/TransactionalFlowTest-context.xml @@ -10,7 +10,8 @@ + input-channel="inputC" output-channel="outputC" + error-channel="errorChannel"/> diff --git a/src/test/resources/log4j.xml b/src/test/resources/log4j.xml index 75e0f19..d99732e 100644 --- a/src/test/resources/log4j.xml +++ b/src/test/resources/log4j.xml @@ -12,11 +12,11 @@ - + - +