Added error channel, modified error handling to be more consistent with other SI channel adapters. Added check to ensure all flow MessageChannels are private"

This commit is contained in:
David Turanski
2011-08-28 17:47:04 -04:00
parent 16951219a4
commit 0a4d85b973
15 changed files with 391 additions and 500 deletions

View File

@@ -1,8 +1,10 @@
package org.springframework.integration.flow;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@@ -121,6 +123,7 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, A
Assert.notEmpty(configLocations, "configLocations cannot be empty");
/*
* create a child application context
*/
@@ -231,54 +234,33 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, A
MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources();
propertySources.addLast(propertySource);
}
}
private void validatePortMapping() {
Assert.notEmpty(this.flowConfiguration.getPortConfigurations(),
"flow configuration contains no port configurations");
/*
* Verify that no channels in the flow context are shared in the parent context
*/
List<String> errors = new ArrayList<String>();
for (PortConfiguration portConfiguration : this.flowConfiguration.getPortConfigurations()) {
String inputChannelName = (String) portConfiguration.getInputChannel();
validateFlowChannelDefinition(inputChannelName, errors, false);
for (String outputPortName : portConfiguration.getOutputPortNames()) {
String outputChannelName = (String) portConfiguration.getOutputChannel(outputPortName);
validateFlowChannelDefinition(outputChannelName, errors, true);
List<String> channelNames = Arrays.asList(this.flowContext.getBeanNamesForType(MessageChannel.class));
Set<String> referencedMessageChannels = FlowUtils.getReferencedMessageChannels(this.flowContext.getBeanFactory(),5);
for (String referencedMessageChannel: referencedMessageChannels) {
if (!channelNames.contains(referencedMessageChannel)) {
errors.add("Flow references channel [" + referencedMessageChannel + "] defined in the parent context. " +
"This channel should be explicitly defined in the flow context");
}
}
if (errors.size() > 0 ) {
throw new BeanDefinitionValidationException("\n"+StringUtils.arrayToDelimitedString(errors.toArray(),"\n"));
}
}
/*
* If flow context does not contain the bean definition then the definition
* comes from the parent context. The flow should should still work with a
* 'global' PublishSubscribeChannel output channel
*/
private void validateFlowChannelDefinition(String channelName, List<String> errors, boolean allowPubSub) {
MessageChannel channel = this.flowContext.getBean(channelName, MessageChannel.class);
if (!this.flowContext.containsBeanDefinition(channelName)) {
if (channel instanceof PublishSubscribeChannel && allowPubSub) {
if (logger.isDebugEnabled()) {
logger.warn("Flow '" + this.flowId +"'" +
" is sharing the publish-subscribe channel '" + channelName +"'" +
" with the parent context.");
}
} else {
errors.add("The flow channel '"
+ channelName
+ "' in flow '"
+ this.flowId
+ "' conflicts with a bean definition in the parent context. It must be explicitly declared in the flow'");
}
}
}
private void bridgeMessagingPorts() {
/*

View File

@@ -40,6 +40,8 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF
private volatile Flow flow;
private volatile String inputPortName;
private volatile MessageChannel errorChannel;
private volatile long timeout;
@@ -54,6 +56,8 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF
FlowMessageHandler flowMessageHandler = new FlowMessageHandler(flowInputChannel, flow.getFlowOutputChannel(), timeout);
flowMessageHandler.setErrorChannel(this.errorChannel);
return flowMessageHandler;
}
@@ -81,6 +85,14 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
*
* @param errorChannel
*/
public void setErrorChannel(MessageChannel errorChannel) {
this.errorChannel = errorChannel;
}
@Override
@@ -97,7 +109,6 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF
this.flowConfiguration = this.flow.getFlowConfiguration().getConfigurationForInputPort(
this.inputPortName);
}
}
}

View File

@@ -17,9 +17,12 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.integration.MessageChannel;
@@ -34,73 +37,107 @@ import org.springframework.util.ResourceUtils;
*
*/
public class FlowUtils {
private FlowUtils() {}
private FlowUtils() {
}
/**
* Create a bridge
*
* @param inputChannel
* @param outputChannel
*/
* Create a bridge
*
* @param inputChannel
* @param outputChannel
*/
public static void bridgeChannels(SubscribableChannel inputChannel, MessageChannel outputChannel) {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannel(outputChannel);
inputChannel.subscribe(bridgeHandler);
}
public static void bridgeChannels(SubscribableChannel inputChannel, MessageChannel outputChannel) {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannel(outputChannel);
inputChannel.subscribe(bridgeHandler);
}
/**
* Register a bean with "flow" prefix
*
* @param beanDefinition
* @param registry
* @return
*/
public static String registerBeanDefinition(BeanDefinition beanDefinition, BeanDefinitionRegistry registry) {
String beanName = BeanDefinitionReaderUtils.generateBeanName(beanDefinition, registry);
beanName = "flow." + beanName;
String strIndex = StringUtils.substringAfter(beanName, "#");
int index = Integer.valueOf(strIndex);
while (registry.isBeanNameInUse(beanName)) {
index++;
beanName = beanName.replaceAll("#\\d$", "#" + (index));
}
registry.registerBeanDefinition(beanName, beanDefinition);
return beanName;
}
/**
* Register a bean with "flow" prefix
*
* @param beanDefinition
* @param registry
* @return
*/
public static String registerBeanDefinition(BeanDefinition beanDefinition, BeanDefinitionRegistry registry) {
String beanName = BeanDefinitionReaderUtils.generateBeanName(beanDefinition, registry);
beanName = "flow." + beanName;
String strIndex = org.apache.commons.lang.StringUtils.substringAfter(beanName, "#");
int index = Integer.valueOf(strIndex);
while (registry.isBeanNameInUse(beanName)) {
index++;
beanName = beanName.replaceAll("#\\d$", "#" + (index));
}
registry.registerBeanDefinition(beanName, beanDefinition);
return beanName;
}
/**
* Read the flow documentation resource into a String if it exists.
* The location is classpath:META-INF/spring/integration/flows/[flowId]/flow.doc
* @param flowId the flow id
* @return the documentation
*/
public static String getDocumentation(String flowId) {
/**
* Read the flow documentation resource into a String if it exists. The
* location is classpath:META-INF/spring/integration/flows/[flowId]/flow.doc
* @param flowId the flow id
* @return the documentation
*/
public static String getDocumentation(String flowId) {
String path = String.format("classpath:META-INF/spring/integration/flows/%s/flow.doc", flowId);
String path = String.format("classpath:META-INF/spring/integration/flows/%s/flow.doc", flowId);
try {
File file = ResourceUtils.getFile(path);
try {
File file = ResourceUtils.getFile(path);
BufferedReader br = new BufferedReader(new FileReader(file));
BufferedReader br = new BufferedReader(new FileReader(file));
String line;
StringBuilder result = new StringBuilder();
while ((line = br.readLine()) != null) {
result.append(line).append("\n");
}
br.close();
return result.toString();
String line;
StringBuilder result = new StringBuilder();
while ((line = br.readLine()) != null) {
result.append(line).append("\n");
}
} catch (FileNotFoundException e) {
return "no help available";
} catch (IOException e) {
e.printStackTrace();
return "no help available";
}
}
br.close();
return result.toString();
}
catch (FileNotFoundException e) {
return "no help available";
}
catch (IOException e) {
e.printStackTrace();
return "no help available";
}
}
public static Set<String> getReferencedMessageChannels(ConfigurableListableBeanFactory beanFactory, int max) {
String[] beans = beanFactory.getBeanNamesForType(Object.class);
Set<String> messageChannels = new HashSet<String>();
_getReferencedMessageChannels(beanFactory, beans, messageChannels);
return Collections.unmodifiableSet(messageChannels);
}
private static void _getReferencedMessageChannels(ConfigurableListableBeanFactory beanFactory, String[] beans,
Set<String> messageChannels) {
for (String bean : beans) {
if (!bean.startsWith("(inner bean)") && !bean.equals("nullChannel")) {
Class<?> clazz = null;
if (beanFactory.containsBean(bean)) {
clazz = beanFactory.getType(bean);
}
if (clazz != null) {
if (MessageChannel.class.isAssignableFrom(clazz)) {
messageChannels.add(bean);
}
String[] dependencies = beanFactory.getDependenciesForBean(bean);
if (dependencies.length > 0) {
_getReferencedMessageChannels(beanFactory, dependencies, messageChannels);
}
}
}
}
}
}

View File

@@ -39,6 +39,7 @@ public class FlowOutboundGatewayParser extends AbstractConsumerEndpointParser {
IntegrationNamespaceUtils.setValueIfAttributeDefined(flowHandlerBuilder, element, "input-port","inputPortName");
IntegrationNamespaceUtils.setValueIfAttributeDefined(flowHandlerBuilder, element, "timeout");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(flowHandlerBuilder, element, "error-channel");
return flowHandlerBuilder;
}

View File

@@ -34,71 +34,85 @@ import org.springframework.integration.support.MessageBuilder;
/**
* A MessageHandler for Handling Flow input and output. Sends messages on its
* input channel to the flow input channel and replies with the flow output (if there
* is one) to its output channel.
* input channel to the flow input channel and replies with the flow output (if
* there is one) to its output channel.
*
* Internally creates a subscriber to a PublishSubscribeChannel automatically
* created for the flow. Since all FlowMessageHandler instances subscribe to
* this channel, a unique flow conversationId is used to correlate flow input
* and output messages
*
* The output message contains a FLOW_OUTPUT_PORT_HEADER identifying which flow
* output port produced the message
* @see FlowUtils
*
* Error handling is done in a standard way. If the flow includes an error
* channel which is bound to an output port, the handler will send the
* ErrorMessage to the outputChannel. If the flow throws an exception, the
* handler will create an ErrorMessage and send it to the outputChannel
*
* Internally creates a subscriber to a PublishSubscribeChannel automatically created for
* the flow. Since all FlowMessageHandler instances subscribe to this channel, a unique
* flow conversationId is used to correlate flow input and output messages
*
* The output message contains a FLOW_OUTPUT_PORT_HEADER identifying which flow output port produced the message
* @see FlowUtils
*
* Error handling is done in a standard way. If the flow includes an error channel which is bound to an output port, the
* handler will send the ErrorMessage to the outputChannel. If the flow throws an exception, the handler will create an
* ErrorMessage and send it to the outputChannel
*
* @author David Turanski
*
*/
public class FlowMessageHandler extends AbstractReplyProducingMessageHandler {
private static Log log = LogFactory.getLog(FlowMessageHandler.class);
private final MessageChannel flowInputChannel;
private final SubscribableChannel flowOutputChannel;
private volatile MessageChannel errorChannel;
private final long timeout;
/**
*
* @param flowInputChannel the Flow input channel
* @param flowOutputChannel a PublishSubscribeChannel internally created and bridged to all flow output channels
* @param flowOutputChannel a PublishSubscribeChannel internally created and
* bridged to all flow output channels
* @param timeout the send timeout duration
*/
public FlowMessageHandler(MessageChannel flowInputChannel, SubscribableChannel flowOutputChannel, long timeout) {
this.flowInputChannel = flowInputChannel;
this.flowOutputChannel = flowOutputChannel;
this.flowOutputChannel = flowOutputChannel;
this.timeout = timeout;
}
public void setErrorChannel(MessageChannel errorChannel) {
this.errorChannel = errorChannel;
}
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
UUID conversationId = requestMessage.getHeaders().getId();
Map<String, Object> flowConversationIdHeader = Collections.singletonMap(FlowConstants.FLOW_CONVERSATION_ID_HEADER,
(Object) conversationId);
Map<String, Object> flowConversationIdHeader = Collections.singletonMap(
FlowConstants.FLOW_CONVERSATION_ID_HEADER, (Object) conversationId);
Message<?> message = MessageBuilder.fromMessage(requestMessage).copyHeaders(flowConversationIdHeader)
.build();
try {
ResponseMessageHandler responseMessageHandler = new ResponseMessageHandler(conversationId);
flowOutputChannel.subscribe(responseMessageHandler);
flowInputChannel.send(message,timeout);
flowInputChannel.send(message, timeout);
return responseMessageHandler.getResponse();
}
catch (MessagingException me) {
log.error(me.getMessage(), me);
if (conversationId.equals(me.getFailedMessage().getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
return new ErrorMessage(me,Collections.singletonMap(FlowConstants.FLOW_OUTPUT_PORT_HEADER,
(Object)FlowConstants.FLOW_HANDLER_EXCEPTION_HEADER_VALUE));
if (conversationId
.equals(me.getFailedMessage().getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
if (errorChannel != null) {
errorChannel.send(new ErrorMessage(me, Collections.singletonMap(
FlowConstants.FLOW_OUTPUT_PORT_HEADER,
(Object) FlowConstants.FLOW_HANDLER_EXCEPTION_HEADER_VALUE)));
} else {
throw me;
}
}
}
return null;
@@ -109,7 +123,9 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler {
*/
private static class ResponseMessageHandler implements MessageHandler {
private final UUID conversationId;
private volatile Message<?> response;
public ResponseMessageHandler(UUID conversationId) {
this.conversationId = conversationId;
}
@@ -123,19 +139,24 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler {
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (conversationId.equals(message.getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
this.response = message;
} else {
if (message instanceof ErrorMessage){
}
else {
/*
* Response from flow's ErrorChannel which is mapped to an output port.
*/
if (message instanceof ErrorMessage) {
MessagingException me = (MessagingException) message.getPayload();
if (conversationId.equals(me.getFailedMessage().getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
this.response = message;
if (conversationId.equals(me.getFailedMessage().getHeaders()
.get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
this.response = message;
}
}
}
}
}
public Message<?> getResponse() {
return this.response;
}

View File

@@ -1,3 +1,2 @@
http\://www.springframework.org/schema/integration/flow/spring-integration-flow-2.0.xsd=org/springframework/integration/flow/config/spring-integration-flow-2.0.xsd
http\://www.springframework.org/schema/integration/flow/spring-integration-flow-2.1.xsd=org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd
http\://www.springframework.org/schema/integration/flow/spring-integration-flow.xsd=org/springframework/integration/flow/config/spring-integration-flow-2.1.xsd

View File

@@ -1,197 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.springframework.org/schema/integration/flow"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:bean="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:tool="http://www.springframework.org/schema/tool"
targetNamespace="http://www.springframework.org/schema/integration/flow"
elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:import namespace="http://www.springframework.org/schema/tool" />
<xsd:import namespace="http://www.springframework.org/schema/beans" schemaLocation="http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"/>
<xsd:import namespace="http://www.springframework.org/schema/integration" />
<xsd:element name="flow">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="bean:props" minOccurs="0"
maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="id" type="xsd:string"
use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
The name of the referenced flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="properties" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
An optional reference to a properties object containing optional or required properties provided to configure the flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="referenced-bean-locations"
type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
An optional list of config locations containing optional or required bean definitions referenced by the flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="help" type="xsd:boolean"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Display port configuration discription
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow-id" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Flow id
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="outbound-gateway">
<xsd:complexType>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow" type="xsd:string"
use="required">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
The name of the referenced flow]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="input-port" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
The name of the input port]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="flow-configuration">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines a flow configuration (metadata exposed by the flow provider)
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="port-mapping-ref">
<xsd:complexType>
<xsd:attribute name="bean"
type="xsd:string" />
</xsd:complexType>
</xsd:element>
<xsd:element name="port-mapping" type="PortMappingType" />
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="port-mapping" type="PortMappingType">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow port mapping configuraiotn
]]></xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:complexType name="PortMappingType">
<xsd:sequence>
<xsd:element name="input-port" minOccurs="0"
maxOccurs="1" type="BasePortType"/>
<xsd:element name="output-port" minOccurs="0"
maxOccurs="unbounded" type="BasePortType" />
</xsd:sequence>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.SubscribableChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="BasePortType">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow port binding
]]></xsd:documentation>
</xsd:annotation>
<xsd:attribute name="name" type="xsd:string" use="required" />
<xsd:attribute name="channel" type="xsd:string"
use="required" />
</xsd:complexType>
</xsd:schema>

View File

@@ -1,177 +1,198 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.springframework.org/schema/integration/flow"
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:bean="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:tool="http://www.springframework.org/schema/tool"
targetNamespace="http://www.springframework.org/schema/integration/flow"
elementFormDefault="qualified" attributeFormDefault="unqualified">
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:bean="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:tool="http://www.springframework.org/schema/tool"
targetNamespace="http://www.springframework.org/schema/integration/flow"
elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:import namespace="http://www.springframework.org/schema/tool" />
<xsd:import namespace="http://www.springframework.org/schema/beans" schemaLocation="http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"/>
<xsd:import namespace="http://www.springframework.org/schema/integration" />
<xsd:import namespace="http://www.springframework.org/schema/tool" />
<xsd:import namespace="http://www.springframework.org/schema/beans"
schemaLocation="http://www.springframework.org/schema/beans/spring-beans-3.1.xsd" />
<xsd:import namespace="http://www.springframework.org/schema/integration" />
<xsd:element name="flow">
<xsd:annotation>
<xsd:documentation><![CDATA[
<xsd:element name="flow">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="bean:props" minOccurs="0"
maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="id" type="xsd:string"
use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="bean:props" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="id" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
The name of the referenced flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="properties" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="properties" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
An optional reference to a properties object containing optional or required properties provided to configure the flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="referenced-bean-locations"
type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="referenced-bean-locations" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
An optional list of config locations containing optional or required bean definitions referenced by the flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="help" type="xsd:boolean"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="help" type="xsd:boolean" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Display port configuration discription
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow-id" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow-id" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Flow id
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="outbound-gateway">
<xsd:complexType>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow" type="xsd:string"
use="required">
<xsd:annotation>
<xsd:documentation>
<xsd:element name="outbound-gateway">
<xsd:complexType>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="error-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="order">
<xsd:annotation>
<xsd:documentation>
Specifies the order for invocation when this endpoint is connected as a
subscriber to a channel. This is particularly relevant when that
channel
is using a "failover" dispatching strategy. It has no effect when
this
endpoint itself is a Polling Consumer for a channel with a queue.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
The name of the referenced flow]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="input-port" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="input-port" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
The name of the input port]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="flow-configuration">
<xsd:annotation>
<xsd:documentation><![CDATA[
<xsd:element name="flow-configuration">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines a flow configuration (metadata exposed by the flow provider)
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="port-mapping" type="PortMappingType" minOccurs="1" maxOccurs="unbounded"/>
</xsd:sequence>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="port-mapping" type="PortMappingType"
minOccurs="1" maxOccurs="unbounded" />
</xsd:sequence>
</xsd:complexType>
</xsd:element>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="PortMappingType">
<xsd:sequence>
<xsd:element name="input-port" minOccurs="0"
maxOccurs="1" type="BasePortType"/>
<xsd:element name="output-port" minOccurs="0"
maxOccurs="unbounded" type="BasePortType" />
</xsd:sequence>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
<xsd:complexType name="PortMappingType">
<xsd:sequence>
<xsd:element name="input-port" minOccurs="0" maxOccurs="1"
type="BasePortType" />
<xsd:element name="output-port" minOccurs="0" maxOccurs="unbounded"
type="BasePortType" />
</xsd:sequence>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.SubscribableChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:annotation>
</xsd:attribute>
<xsd:complexType name="BasePortType">
<xsd:annotation>
<xsd:documentation><![CDATA[
<xsd:attribute name="output-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.SubscribableChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="BasePortType">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow port binding
]]></xsd:documentation>
</xsd:annotation>
<xsd:attribute name="name" type="xsd:string" use="required" />
<xsd:attribute name="channel" type="xsd:string"
use="required" />
</xsd:complexType>
</xsd:annotation>
<xsd:attribute name="name" type="xsd:string" use="required" />
<xsd:attribute name="channel" type="xsd:string" use="required" />
</xsd:complexType>
</xsd:schema>

View File

@@ -28,7 +28,6 @@ import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.PollableChannel;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.message.GenericMessage;
/**
*
@@ -39,25 +38,25 @@ import org.springframework.integration.message.GenericMessage;
public class FlowWithErrorTest {
@Test
public void testExceptionThrown(){
public void testFlowThrowsExceptionWithGatewayErrorChannel(){
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/FlowWithErrorTest-context.xml");
MessageChannel input = applicationContext.getBean("inputC",MessageChannel.class);
PollableChannel output = applicationContext.getBean("outputC",PollableChannel.class);
Message<String> msg = new GenericMessage<String>("hello");
input.send(msg);
Message<?> reply = output.receive(100);
assertNotNull(reply);
assertTrue(reply.getPayload() instanceof MessagingException);
MessageChannel inputChannel = applicationContext.getBean("inputC",MessageChannel.class);
SubscribableChannel errorChannel = applicationContext.getBean("errorChannel",SubscribableChannel.class);
Message<String> msg = new GenericMessage<String>("hello");
Handler handler = new Handler();
errorChannel.subscribe(handler);
inputChannel.send(msg);
assertTrue(handler.gotResponse);
}
@Test
public void testDirectCallWithErrorChannel(){
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/META-INF/spring/integration/flows/subflow5/subflow5-context.xml");
MessageChannel input = applicationContext.getBean("subflow-input",MessageChannel.class);
SubscribableChannel error = applicationContext.getBean("errorChannel",SubscribableChannel.class);
MessageChannel inputChannel = applicationContext.getBean("subflow-input",MessageChannel.class);
SubscribableChannel errorChannel = applicationContext.getBean("errorChannel",SubscribableChannel.class);
error.subscribe(new MessageHandler() {
errorChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
@@ -66,23 +65,38 @@ public class FlowWithErrorTest {
});
Message<String> msg = new GenericMessage<String>("hello");
assertTrue(input.send(msg));
assertTrue(inputChannel.send(msg));
}
@Test
public void testWithErrorChannel(){
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/FlowWithErrorTest-context.xml");
MessageChannel input = applicationContext.getBean("inputC1",MessageChannel.class);
MessageChannel inputChannel = applicationContext.getBean("inputC1",MessageChannel.class);
PollableChannel output = applicationContext.getBean("outputC1",PollableChannel.class);
Message<String> msg = new GenericMessage<String>("hello");
input.send(msg);
inputChannel.send(msg);
Message<?> reply = output.receive(100);
assertNotNull(reply);
assertTrue(reply.getPayload() instanceof MessagingException);
}
private static class Handler implements MessageHandler {
public boolean gotResponse;
@SuppressWarnings("unused")
public Message<?> message;
/* (non-Javadoc)
* @see org.springframework.integration.core.MessageHandler#handleMessage(org.springframework.integration.Message)
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
this.gotResponse = true;
this.message = message;
}
}
}

View File

@@ -87,14 +87,14 @@ public class TransactionalFlowTest {
}
@Test
public void testFlowRollback() {
public void testFlowRollbackWithGatewayErrorChannel() {
ApplicationContext applicationContext =
new ClassPathXmlApplicationContext("/TransactionalFlowTest-context.xml","/txmanager-config.xml");
MessageChannel inputChannel = applicationContext.getBean("inputC", MessageChannel.class);
SubscribableChannel outputChannel = applicationContext.getBean("outputC", SubscribableChannel.class);
SubscribableChannel errorChannel = applicationContext.getBean("errorChannel", SubscribableChannel.class);
StubTransactionManager transactionManager = applicationContext.getBean(StubTransactionManager.class);
Handler handler = new Handler();
outputChannel.subscribe(handler);
errorChannel.subscribe(handler);
inputChannel.send(new GenericMessage<String>("rollback"));

View File

@@ -13,7 +13,8 @@
<!-- input port not required if only one -->
<int-flow:outbound-gateway flow="subflow"
input-channel="inputC"
output-channel="outputC"/>
output-channel="outputC"
/>
<int:channel id="outputC"/>

View File

@@ -9,17 +9,16 @@
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd">
<int-flow:flow id="subflow4"/>
<int-flow:flow id="subflow5"/>
<int-flow:outbound-gateway
flow="subflow4"
input-port="input"
input-channel="inputC"
output-channel="outputC"/>
<int-flow:flow id="subflow5"/>
output-channel="outputC"
error-channel="errorChannel"/>
<int-flow:outbound-gateway
flow="subflow5"

View File

@@ -35,6 +35,7 @@
</int:service-activator>
<int:channel id="subflow-output"/>
<int:publish-subscribe-channel id="errorChannel"/>
</beans>

View File

@@ -10,7 +10,8 @@
<int-flow:flow id="transactional-flow"/>
<int-flow:outbound-gateway flow="transactional-flow"
input-channel="inputC" output-channel="outputC"/>
input-channel="inputC" output-channel="outputC"
error-channel="errorChannel"/>
<int:channel id="outputC"/>

View File

@@ -12,11 +12,11 @@
<!-- Loggers -->
<logger name="org.springframework">
<level value="info" />
<level value="warn" />
</logger>
<logger name="org.springframework.integration">
<level value="debug" />
<level value="warn" />
</logger>
<!-- Root Logger -->