Simplified configuration and updated README

This commit is contained in:
David Turanski
2011-07-16 16:48:26 -04:00
parent d43f225c31
commit 9c6c4d30ad
15 changed files with 638 additions and 420 deletions

161
README
View File

@@ -1,66 +1,145 @@
Spring Integration Flow
-----------------------
NOTE: If you want to get right to the code, see the unit tests and check out (literally :) the spring-integration-flow-samples project
Goals
---------
This component is exploring mechanisms to encapsulate a referenced Spring Integration message flow as a component.
A flow is a Spring Integration message flow intended for reuse. A flow is accessed via logical "ports" which map
to internal channels.
Spring Integration components already support common enterprise integration patterns. Sometimes it is desirable to
create common message flows which implement higher level messaging patterns or "cross cutting concerns" for your
application environment. This is especially true in more complex applications.
A flow may expose multiple inputs and multiple outputs. A port mapping is defined for each input and has multiple
outputs associated with it. See src/test/resources/META-INF/spring/integration/flows/subflow1/subflow1-context.xml, for example.
Out of the box, a best practice is to define a common message flow in its own bean definition
file which may be imported into other message flows. This approach however has some limitations:
- It's input and output channels are referenced in every consuming flow. Each common flow must ensure unique channel names
- True encapsuation is impossible as any internal channels and components are exposed to the consuming flow. Note that these must have
unique names as well
- Since a common flow is statically bound to channels, it cannot be used in a chain without implementing some type of service-activator/
gateway wrapper.
- Since the common flow is part of the same application context (an imported resource), is not practical to configure multiple instances with
different property values, bean definitions, etc. (As of Spring 3.1 it may possible to do something with environment profiles, but would be overly complex)
Generally, a message flow may behave like a router. For example, a flow may define
a primary output and a discard output. Additionally, it may act like a delayer, providing no immediate response. Or it
could act as an outbound channel adapter, providing no output.
The spring-integration-flow module provides a way to implement and use common flows while addressing the above limitations. A flow is an
abstraction of a Spring Integration component which is itself implemented with Spring Integration. In general, a flow may expose multiple inputs and multiple outputs,
called ports. Each port binds to a channel internal to the flow. So a flow has input ports and output ports. The most common configuration is expected to define
one input port and output port. Other likely configurations are one input and zero to a small number of outputs.
A flow may behave like a router. For example, a flow may define a primary output port for normal processing and a discard port for exceptional cases.
Alternately, it may act like a delayer, providing no immediate response. Or it could act as an outbound channel adapter - a strictly one way or fire-and-forget flow.
The goal is to support these, and potentially other semantics while providing better encapsulation and configuration options. Configuration is provided via properties
and or referenced bean definitions. Each flow is initialized in a child application context. This allows you to configure multiple instances of the same flow differently.
The flow is not bound to input and output channels in the consumer context and may be naturally invoked via a flow outbound gateway. The outbound gateway may be used within
a chain. It is also possible in theory to compose flows of other flows (NOTE: This hasn't been tested yet).
The goal is to support these, and possibly other semantics. Additional goals are:
- Encapsulation: the flow channels, and component should not be included the consumer application context. The consumer need
not know how the flow is configured (i.e, it is contained in a jar).
- Configuration: The flow may reference properties or beans to be provided by the consumer, e.g, a generic XML processor may require an OXM marshaller.
- The flow should provide a description, in terms of ports, properties, and beans it exposes
- It should be easy to implement or wrap an existing configuration as a flow and provide a better option than simple importing
a spring configuration file and sending a message to one of its input channels
Usage
-------
The flow consumer instantiates a flow and defines one or more flow outbound-gateways associated to an input port:
--------
The flow consumer instantiates a flow and configures one or more flow outbound-gateways.
Instantiating a flow is very simple:
<int-flow:flow id="subflow1"/>
The flow also supports a properties attribute and a referenced-bean-locations attribute used to inject properties and a list of configuration files respectively.
Additionally any bean definition may be or property in the parent context may be inherited by the flow.
The above bean definition instantiates a flow defined as "subflow1". The flow id references an existing flow implementation, presumably packaged in a
separate jar. The flow element also provides optional 'properties' attribute and a 'referenced-bean-locations' attributes to inject properties and a list of
bean definition locations respectively. Note that any bean definition or property in the parent context may also be referenced (inherited) by the flow. As
an alternative to the properties attribute which references a util:properties bean, A properties object may be configured as an inner bean. The following are
functionally equivalent:
-----------------------------------------------------------
<util:properties id="myprops">
<prop key="key1>val1</prop>
</util:properties>
An optional flow-id may be used to resolve the flow configuration. By default the id attribute is used as the flow-id. The flow-id is useful if there are multiple
instances of the same flow, each configured differently. Each instance would have a unique id and the same flow-id.
<int-flow:flow id="subflow1" properties="myprops"/>
An optional help attribute will output the flow's documentation to the STDOUT if set to 'true'.
---------------------------------------------------------
<int-flow:flow id="subflow1">
<props>
<prop key="key1>val1</prop>
</props>
</int-flow:flow>
---------------------------------------------------------
If there are multiple instances of the same flow, you must specify a flow-id attribute:
<int-flow:flow id="flow1" flow-id="subflow1">
<props>
<prop key="key1>some value</prop>
</props>
</int-flow:flow>
<int-flow:flow id="flow2" flow-id="subflow1">
<props>
<prop key="key1>another value</prop>
</props>
</int-flow:flow>
<int-flow:outbound-gateway flow="subflow1" input-channel="another-input"
output-channel="another-output" input-port="gateway-input"/>
A message sent on the input-channel is delegated to the flow. The message on the output-channel is a response from one of the output
ports. The output port name is contained in the response header 'flow.output.port'
By default the id attribute is used as the flow-id.
The input-port attribute is optional -- if the flow defines a single input port it will be mapped by default.
An optional help attribute, if set to true will output the flow's description document (if there is one) to the STDOUT.
The flow is invoked via an outbound-gateway:
<int-flow:outbound-gateway flow="flow1" input-channel="inputChannel1" output-channel="outputChannel1"/>
<int-flow:outbound-gateway flow="flow2" input-channel="inputChannel2" output-channel="outputChannel2"/>
A message sent on the gateway's input-channel is delegated to the flow. The message on the output-channel is a response from one of the flows output
ports. The output port name is contained in the response message header 'flow.output.port'
NOTE: An optional input-port attribute is available if the flow defines multiple inputs, otherwise the input port it will be automatically mapped.
Flows may also be used in a chain just as any AbstractReplyProducingMessageHandler:
<chain input-channel="inputChannel" output-channel="outputChannel">
<int-flow:outbound-gateway flow="flow1"/>
<int-flow:outbound-gateway flow="flow2"/>
</chain>
Flow Implementation
Implementing a Flow
-------------------
The flow element creates a Flow instance (eventually, configured with properties, referenced beans, etc.) The flow id is used to derive the flow's
spring bean definition file by convention (classpath:META-INF/spring/flows/[flow-id]/*.xml). This bean definition file and any referenced bean locations
will be used to create a child application context. The flow context must provide a FlowConfiguration containing the metadata describing the input ports,
output ports, and referenced beans and properties.
The flow element is used to locate the flow's spring bean definition file(s) by convention (classpath:META-INF/spring/flows/[flow-id]/*.xml). It's bean definition
files and any referenced-bean-locations will be used to create a child application context. The flow context must provide a FlowConfiguration bean which defines the
flows input and output ports and maps them to internal input and output channels.
The flow implementer should also create a text file classpath:META-INF/spring/flows/[flow-id]/flow.doc which will contain the 'help' contents.
Namespace support is provided for FlowConfiguration. In the simplest case, a flow implementation encapsulates a Spring Integration flow with a single input-channel and
a single output-channel. The required configuration is simply declared:
Currently, all defined outputs are bridged to a PublishSubscribeChannel which acts as a single 'flowOutputChannel'.
Each flow outbound-gateway instance is backed by a FlowMessageHandler that bridges the 'flow output channel' to its own QueueChannel. This emulates
a JMS topic. Each flow message handler sends the request message to the flow input channel corresponding to the input port and checks
its queue for a response. A correlation id (flow conversation id) is used to correlate the response to the request.
<int-flow:flow-configuration>
<int-flow:port-mapping input-channel="inputChannel" output-channel="outputChannel"/>
</int-flow:flow-configuration>
If the flow message handler catches an exception, it will convert it to an ErrorMessage response. Alternately, the flow can map its errorChannel to an output port
Note: The output-channel attribute is optional if the flow does not produce a response.
The response message contains a 'flow.output.port' header indicating which output port provided the response.
For more complex scenarios, the flow configuration supports multiple port-mappings, each bound to a single input channel and 0 or more output channels. A configuration
for specifying multiple outputs looks like:
Currently flow input and output channels must inherit from SubscribableChannel, e.g., DirectChannel or PublishSubscribe channel.
<int-flow:flow-configuration>
<int-flow:port-mapping>
<int-flow:input-port name="input" channel="inputChannel"/>
<int-flow:output-port name="output" channel="outputChannel"/>
<int-flow:output-port name="discard" channel="discardChannel"/>
</int-flow:port-mapping>
</int-flow:flow-configuration>
If the flow defines multiple inputs, then multiple port-mapping elements must be configured. Additionally the flow client must specify the input-port in the outbound-gateway.
Flow Description File
----------------------
The user-friendly flow implementer may also create a text file classpath:META-INF/spring/flows/[flow-id]/flow.doc which describes the flow. Its contents will be written to
STDOUT if the 'help' attribute on the client's flow declaration is set to true.
FlowMessageHandler Internals
------------------------------
Currently, all defined outputs are automatically bridged to a PublishSubscribeChannel which acts as a single output channel for the flow. Each flow outbound-gateway instance
is backed by a FlowMessageHandler that bridges the flow output channel to its own QueueChannel instance. This emulates a JMS topic. Each FlowMessageHandler subscribes to the
flow. It sends the request message to the flows input channel mapped to the input port and checks its queue for a response. If there are multiple FlowMessageHandlers subscribed
to the flow, each receive a response on the flow output channel, a correlation id (flow conversation id) is internally generated to correlate the response message to the request message.
If the FlowMessageHandler catches an exception, it will convert it to an ErrorMessage response. Alternately, the flow can map its errorChannel to an output port
Currently flow input and output channels must inherit from SubscribableChannel, e.g., DirectChannel or PublishSubscribe channel.

View File

@@ -19,54 +19,69 @@
package org.springframework.integration.flow;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ChannelNamePortConfiguration implements PortConfiguration {
private PortMetadata inputPortMetadata;
private List<PortMetadata> outputPortMetadataList;
private PortMetadata inputPortMetadata;
private List<PortMetadata> outputPortMetadataList;
public ChannelNamePortConfiguration(PortMetadata inputPortMetadata, List<PortMetadata> outputPortMetadataList) {
this.outputPortMetadataList = outputPortMetadataList;
this.inputPortMetadata = inputPortMetadata;
}
public ChannelNamePortConfiguration(PortMetadata inputPortMetadata, List<PortMetadata> outputPortMetadataList) {
this.outputPortMetadataList = outputPortMetadataList;
this.inputPortMetadata = inputPortMetadata;
}
@Override
public String getInputPortName() {
return this.inputPortMetadata.getPortName();
}
public ChannelNamePortConfiguration(String inputChannelName, String outputChannelName) {
this.inputPortMetadata = new PortMetadata("input", inputChannelName);
@Override
public String getInputChannel() {
return this.inputPortMetadata.getChannelName();
}
@Override
public String getOutputChannel(String portName) {
PortMetadata portMetadata = (PortMetadata) find(portName);
if (portMetadata != null) {
return portMetadata.getChannelName();
}
return null;
}
@Override
public List<String> getOutputPortNames() {
List<String> results = new ArrayList<String>();
for (PortMetadata portMetadata : outputPortMetadataList ) {
results.add(portMetadata.getPortName());
}
return results;
}
public PortMetadata find(String portName){
for (PortMetadata portMetadata : outputPortMetadataList ) {
if (portName.equals(portMetadata.getPortName())){
return portMetadata;
}
if (outputChannelName != null) {
PortMetadata outputPortMetadata = new PortMetadata("output", outputChannelName);
this.outputPortMetadataList = Collections.singletonList(outputPortMetadata);
} else {
// this.outputPortMetadataList = new ArrayList<PortMetadata>();
}
return null;
}
}
@Override
public String getInputPortName() {
return this.inputPortMetadata.getPortName();
}
@Override
public String getInputChannel() {
return this.inputPortMetadata.getChannelName();
}
@Override
public String getOutputChannel(String portName) {
PortMetadata portMetadata = (PortMetadata) findOutputPort(portName);
if (portMetadata != null) {
return portMetadata.getChannelName();
}
return null;
}
@Override
public List<String> getOutputPortNames() {
List<String> results = new ArrayList<String>();
if (outputPortMetadataList != null) {
for (PortMetadata portMetadata : outputPortMetadataList) {
results.add(portMetadata.getPortName());
}
}
return results;
}
private PortMetadata findOutputPort(String portName) {
if (outputPortMetadataList != null) {
for (PortMetadata portMetadata : outputPortMetadataList) {
if (portName.equals(portMetadata.getPortName())) {
return portMetadata;
}
}
}
return null;
}
}

View File

@@ -34,164 +34,171 @@ import org.springframework.util.StringUtils;
*/
public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, ApplicationContextAware {
private static Log logger = LogFactory.getLog(Flow.class);
private static Log logger = LogFactory.getLog(Flow.class);
private volatile ClassPathXmlApplicationContext flowContext;
private ApplicationContext applicationContext;
private volatile ClassPathXmlApplicationContext flowContext;
private volatile FlowConfiguration flowConfiguration;
private ApplicationContext applicationContext;
private volatile String[] configLocations;
private volatile FlowConfiguration flowConfiguration;
private volatile String[] referencedBeanLocations;
private volatile String[] configLocations;
private volatile Properties flowProperties;
private volatile String[] referencedBeanLocations;
private volatile String beanName;
private volatile String flowId;
private volatile Properties flowProperties;
private volatile ChannelResolver flowChannelResolver;
private volatile String beanName;
private volatile PublishSubscribeChannel flowOutputChannel;
private volatile String flowId;
private volatile boolean help;
private volatile ChannelResolver flowChannelResolver;
public Flow() {
private volatile PublishSubscribeChannel flowOutputChannel;
}
private volatile boolean help;
public Flow(String[] configLocations) {
this.configLocations = configLocations;
}
public Flow() {
@Override
public void afterPropertiesSet() {
if (this.flowId == null){
this.flowId = this.beanName;
}
if (this.help) {
}
public Flow(Properties flowProperties, String[] configLocations) {
this.flowProperties = flowProperties;
this.configLocations = configLocations;
}
public Flow(String[] configLocations) {
this.configLocations = configLocations;
}
@Override
public void afterPropertiesSet() {
if (this.flowId == null) {
this.flowId = this.beanName;
}
if (this.help) {
System.out.println(FlowUtils.getDocumentation(this.flowId));
}
if (configLocations == null) {
configLocations = new String[] { String.format(
"classpath:META-INF/spring/integration/flows/%s/*.xml", this.flowId) };
}
if (configLocations == null) {
configLocations = new String[] { String.format("classpath:META-INF/spring/integration/flows/%s/*.xml",
this.flowId) };
}
if (referencedBeanLocations != null) {
configLocations = (String[]) ArrayUtils.addAll(configLocations, referencedBeanLocations);
}
if (referencedBeanLocations != null) {
configLocations = (String[]) ArrayUtils.addAll(configLocations, referencedBeanLocations);
}
logger.debug("instantiating flow context from configLocations ["
+ StringUtils.arrayToCommaDelimitedString(configLocations) + "]");
logger.debug("instantiating flow context from configLocations ["
+ StringUtils.arrayToCommaDelimitedString(configLocations) + "]");
Assert.notEmpty(configLocations, "configLocations cannot be empty");
Assert.notEmpty(configLocations, "configLocations cannot be empty");
flowContext = new ClassPathXmlApplicationContext(applicationContext);
flowContext = new ClassPathXmlApplicationContext(applicationContext);
addReferencedProperties();
addReferencedProperties();
this.flowContext.setConfigLocations(configLocations);
this.flowContext.refresh();
this.flowContext.refresh();
this.flowConfiguration = flowContext.getBean(FlowConfiguration.class);
Assert.notNull(flowConfiguration, "flow context does not contain a flow configuration");
Assert.notNull(flowConfiguration, "flow context does not contain a flow configuration");
validatePortMapping();
validatePortMapping();
this.flowChannelResolver = new BeanFactoryChannelResolver(flowContext);
bridgeMessagingPorts();
this.flowChannelResolver = new BeanFactoryChannelResolver(flowContext);
}
bridgeMessagingPorts();
public FlowConfiguration getFlowConfiguration() {
return this.flowConfiguration;
}
}
@Override
public void setBeanName(String name) {
this.beanName = name;
public FlowConfiguration getFlowConfiguration() {
return this.flowConfiguration;
}
}
@Override
public void setBeanName(String name) {
this.beanName = name;
public String getBeanName() {
return this.beanName;
}
}
public void setFlowId(String flowId) {
public String getBeanName() {
return this.beanName;
}
public void setFlowId(String flowId) {
this.flowId = flowId;
}
public void setReferencedBeanLocations(String[] referencedBeanLocations) {
this.referencedBeanLocations = referencedBeanLocations;
}
this.referencedBeanLocations = referencedBeanLocations;
}
public void setProperties(Properties flowProperties) {
this.flowProperties = flowProperties;
}
public void setProperties(Properties flowProperties) {
this.flowProperties = flowProperties;
}
public Properties getProperties() {
return this.flowProperties;
}
public void setHelp(boolean help) {
this.help = help;
}
public void setHelp(boolean help) {
this.help = help;
}
public PublishSubscribeChannel getFlowOutputChannel() {
return flowOutputChannel;
}
public PublishSubscribeChannel getFlowOutputChannel() {
return flowOutputChannel;
}
public void setFlowOutputChannel(PublishSubscribeChannel flowOutputChannel) {
this.flowOutputChannel = flowOutputChannel;
}
public void setFlowOutputChannel(PublishSubscribeChannel flowOutputChannel) {
this.flowOutputChannel = flowOutputChannel;
}
@Override
public MessageChannel resolveChannelName(String channelName) {
return flowChannelResolver.resolveChannelName(channelName);
}
@Override
public MessageChannel resolveChannelName(String channelName) {
return flowChannelResolver.resolveChannelName(channelName);
}
private void addReferencedProperties() {
if (flowProperties != null) {
PropertySource<?> propertySource = new PropertiesPropertySource("flowProperties", flowProperties);
private void addReferencedProperties() {
if (flowProperties != null) {
PropertySource<?> propertySource = new PropertiesPropertySource("flowProperties", flowProperties);
MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources();
propertySources.addLast(propertySource);
}
MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources();
propertySources.addLast(propertySource);
}
}
}
private void validatePortMapping() {
Assert.notEmpty(this.flowConfiguration.getPortConfigurations(),
"flow configuration contains no port configurations");
}
private void validatePortMapping() {
Assert.notEmpty(this.flowConfiguration.getPortConfigurations(),
"flow configuration contains no port configurations");
}
private void bridgeMessagingPorts() {
private void bridgeMessagingPorts() {
/*
* create a bridge for each target output port to the flow outputChannel
*/
for (PortConfiguration targetPortConfiguration : this.getFlowConfiguration()
.getPortConfigurations()) {
for (String outputPort : targetPortConfiguration.getOutputPortNames()) {
String targetOutputChannelName = (String) targetPortConfiguration.getOutputChannel(outputPort);
SubscribableChannel inputChannel = (SubscribableChannel) resolveChannelName(targetOutputChannelName);
/*
* create a bridge for each target output port to the flow outputChannel
*/
for (PortConfiguration targetPortConfiguration : this.getFlowConfiguration().getPortConfigurations()) {
for (String outputPort : targetPortConfiguration.getOutputPortNames()) {
String targetOutputChannelName = (String) targetPortConfiguration.getOutputChannel(outputPort);
SubscribableChannel inputChannel = (SubscribableChannel) resolveChannelName(targetOutputChannelName);
((AbstractMessageChannel)inputChannel).addInterceptor(new FlowInterceptor(outputPort));
((AbstractMessageChannel) inputChannel).addInterceptor(new FlowInterceptor(outputPort));
logger.debug("creating output bridge on [" + outputPort + "] inputChannelName = ["
+ targetOutputChannelName + "] outputChannel = [" + this.flowOutputChannel + "]");
FlowUtils.bridgeChannels(inputChannel, this.flowOutputChannel);
}
}
}
logger.debug("creating output bridge on [" + outputPort + "] inputChannelName = ["
+ targetOutputChannelName + "] outputChannel = [" + this.flowOutputChannel + "]");
FlowUtils.bridgeChannels(inputChannel, this.flowOutputChannel);
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
this.applicationContext = applicationContext;
}
}

View File

@@ -15,6 +15,8 @@
*/
package org.springframework.integration.flow.config.xml;
import groovy.sql.OutParameter;
import java.util.List;
import org.springframework.beans.factory.config.BeanDefinition;
@@ -23,6 +25,7 @@ import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.BeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.flow.ChannelNamePortConfiguration;
import org.springframework.integration.flow.FlowConfiguration;
import org.springframework.integration.flow.PortMetadata;
@@ -48,9 +51,36 @@ public class FlowConfigurationParser implements BeanDefinitionParser {
ManagedList<Object> portConfigList = new ManagedList<Object>();
for (Element el : portMappings) {
BeanDefinition portConfiguration = buildFlowProviderPortConfiguration(el, parserContext);
portConfigList.add(portConfiguration);
if (!DomUtils.getChildElements(el).isEmpty()){
if (el.hasAttribute("input-channel") || el.hasAttribute("output-channel")){
parserContext.getReaderContext().error(
"port-mapping cannot include both channel attributes and child elements",
flowConfigurationBuilder);
}
BeanDefinition portConfiguration = buildFlowProviderPortConfiguration(el, parserContext);
portConfigList.add(portConfiguration);
}
else
{
// A default port configuration
if (!(el.hasAttribute("input-channel"))){
parserContext.getReaderContext().error(
"port-mapping with no child elements must include an 'input-channel' attribute",
flowConfigurationBuilder);
}
BeanDefinitionBuilder portConfigurationBuilder = BeanDefinitionBuilder
.genericBeanDefinition(ChannelNamePortConfiguration.class);
portConfigurationBuilder.addConstructorArgValue(el.getAttribute("input-channel"));
if (el.hasAttribute("output-channel")){
portConfigurationBuilder.addConstructorArgValue(el.getAttribute("output-channel"));
} else {
portConfigurationBuilder.addConstructorArgValue(null);
}
portConfigList.add(portConfigurationBuilder.getBeanDefinition());
}
}
flowConfigurationBuilder.addConstructorArgValue(portConfigList);

View File

@@ -23,6 +23,7 @@ import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.flow.Flow;
import org.springframework.integration.flow.config.FlowUtils;
import org.springframework.util.xml.DomUtils;
import org.w3c.dom.Element;
/**
@@ -34,6 +35,16 @@ public class FlowParser implements BeanDefinitionParser {
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
Element props = DomUtils.getChildElementByTagName(element,"props");
if(element.hasAttribute("properties") && props !=null) {
parserContext.getReaderContext().error(
"Element cannot have both 'properties' attribute and inner 'props' element",element);
}
BeanDefinitionBuilder flowBuilder = BeanDefinitionBuilder.genericBeanDefinition(Flow.class);
String id = element.getAttribute("id");
BeanDefinitionBuilder flowOutputChannelBuilder = BeanDefinitionBuilder
@@ -45,11 +56,17 @@ public class FlowParser implements BeanDefinitionParser {
IntegrationNamespaceUtils.setValueIfAttributeDefined(flowBuilder, element, "referenced-bean-locations");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(flowBuilder, element, "properties");
IntegrationNamespaceUtils.setValueIfAttributeDefined(flowBuilder, element, "help");
IntegrationNamespaceUtils.setValueIfAttributeDefined(flowBuilder, element, "flow-id");
IntegrationNamespaceUtils.setValueIfAttributeDefined(flowBuilder, element, "flow-id");
if (props != null) {
flowBuilder.addPropertyValue("properties",parserContext.getDelegate().parsePropsElement(props));
}
BeanDefinition beanDefinition = flowBuilder.getBeanDefinition();
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
return beanDefinition;
}
}

View File

@@ -1,49 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.springframework.org/schema/integration/flow"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tool="http://www.springframework.org/schema/tool"
targetNamespace="http://www.springframework.org/schema/integration/flow"
elementFormDefault="qualified" attributeFormDefault="unqualified">
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:bean="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:tool="http://www.springframework.org/schema/tool"
targetNamespace="http://www.springframework.org/schema/integration/flow"
elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:import namespace="http://www.springframework.org/schema/tool" />
<xsd:element name="flow">
<xsd:annotation>
<xsd:documentation><![CDATA[
<xsd:import namespace="http://www.springframework.org/schema/beans" />
<xsd:import namespace="http://www.springframework.org/schema/integration" />
<xsd:element name="flow">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:attribute name="id" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="bean:props" minOccurs="0"
maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="id" type="xsd:string"
use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
The name of the referenced flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="properties" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="properties" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
An optional reference to a properties object containing optional or required properties provided to configure the flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="referenced-bean-locations" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="referenced-bean-locations"
type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
An optional list of config locations containing optional or required bean definitions referenced by the flow
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="help" type="xsd:boolean"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="help" type="xsd:boolean"
use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Display port configuration discription
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow-id" type="xsd:string"
use="optional">
<xsd:annotation>
@@ -52,101 +61,136 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:complexType>
</xsd:element>
<xsd:element name="outbound-gateway">
<xsd:complexType>
<xsd:attribute name="input-channel" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.core.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.core.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
<xsd:element name="outbound-gateway">
<xsd:complexType>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="flow" type="xsd:string"
use="required">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
The name of the referenced flow]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="input-port" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="input-port" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:documentation>
<![CDATA[
The name of the input port]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="flow-configuration">
<xsd:annotation>
<xsd:documentation><![CDATA[
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="flow-configuration">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines a flow configuration (metadata exposed by the flow provider)
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="port-mapping-ref">
<xsd:complexType>
<xsd:attribute name="bean" type="xsd:string" />
</xsd:complexType>
</xsd:element>
<xsd:element name="port-mapping" type="PortMappingType" />
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:choice minOccurs="1" maxOccurs="unbounded">
<xsd:element name="port-mapping-ref">
<xsd:complexType>
<xsd:attribute name="bean"
type="xsd:string" />
</xsd:complexType>
</xsd:element>
<xsd:element name="port-mapping" type="PortMappingType">
<xsd:annotation>
<xsd:documentation><![CDATA[
<xsd:element name="port-mapping" type="PortMappingType" />
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="port-mapping" type="PortMappingType">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow port mapping configuraiotn
]]></xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:annotation>
</xsd:element>
<xsd:complexType name="BasePortType">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow port binding
]]></xsd:documentation>
</xsd:annotation>
<xsd:attribute name="name" type="xsd:string" use="required" />
<xsd:attribute name="channel" type="xsd:string" use="required" />
</xsd:complexType>
<xsd:complexType name="PortMappingType">
<xsd:sequence>
<xsd:element name="input-port" minOccurs="0"
maxOccurs="1" type="BasePortType"/>
<xsd:element name="output-port" minOccurs="0"
maxOccurs="unbounded" type="BasePortType" />
</xsd:sequence>
<xsd:attribute name="input-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="output-channel" type="xsd:string"
use="optional">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.SubscribableChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The receiving Message channel of this endpoint
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="BasePortType">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an integration flow port binding
]]></xsd:documentation>
</xsd:annotation>
<xsd:attribute name="name" type="xsd:string" use="required" />
<xsd:attribute name="channel" type="xsd:string"
use="required" />
</xsd:complexType>
<xsd:complexType name="PortMappingType">
<xsd:sequence>
<xsd:element name="input-port" minOccurs="1" maxOccurs="1"
nillable="false" type="BasePortType">
</xsd:element>
<xsd:element name="output-port" minOccurs="0" maxOccurs="unbounded" type="BasePortType"/>
</xsd:sequence>
</xsd:complexType>
</xsd:schema>

View File

@@ -25,71 +25,77 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.PollableChannel;
import org.springframework.integration.flow.Flow;
import org.springframework.integration.message.GenericMessage;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
*
* @author David Turanski
*
*
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:/FlowClientNamespaceTest-context.xml")
public class FlowClientNamespaceTest {
@Autowired
@Qualifier("another-input")
MessageChannel gatewayInput;
@Autowired
@Qualifier("another-output")
PollableChannel gatewayOutput;
@Test
public void testGateway(){
Message<String> msg = new GenericMessage<String>("hello");
gatewayInput.send(msg);
Message<?> reply = gatewayOutput.receive();
assertNotNull(reply);
@Autowired
@Qualifier("inputC1")
MessageChannel gatewayInput;
@Autowired
@Qualifier("outputC1")
PollableChannel gatewayOutput;
@Test
public void testGateway() {
Message<String> msg = new GenericMessage<String>("hello");
gatewayInput.send(msg);
Message<?> reply = gatewayOutput.receive();
assertNotNull(reply);
}
@Autowired
@Qualifier("another-input2")
MessageChannel gatewayInput2;
@Autowired
@Qualifier("another-output2")
PollableChannel gatewayOutput2;
@Test
public void testOutboundGateway(){
Message<String> msg1 = new GenericMessage<String>("hello");
Message<String> msg2 = new GenericMessage<String>("world");
Message<?> reply = null;
gatewayInput2.send(msg1);
reply = gatewayOutput2.receive();
assertNotNull(reply);
assertEquals("gateway-output",reply.getHeaders().get("flow.output.port"));
assertEquals("yeah!",reply.getHeaders().get("gateway"));
gatewayInput2.send(msg2);
reply = gatewayOutput2.receive();
assertNotNull(reply);
assertEquals("gateway-discard",reply.getHeaders().get("flow.output.port"));
assertEquals("yeah!",reply.getHeaders().get("gateway"));
gatewayInput2.send(msg1);
reply = gatewayOutput2.receive();
assertNotNull(reply);
assertEquals("gateway-output",reply.getHeaders().get("flow.output.port"));
assertEquals("yeah!",reply.getHeaders().get("gateway"));
@Autowired
@Qualifier("inputC2")
MessageChannel gatewayInput2;
@Autowired
@Qualifier("outputC2")
PollableChannel gatewayOutput2;
@Test
public void testOutboundGateway() {
Message<String> msg1 = new GenericMessage<String>("hello");
Message<String> msg2 = new GenericMessage<String>("world");
Message<?> reply = null;
gatewayInput2.send(msg1);
reply = gatewayOutput2.receive();
assertNotNull(reply);
assertEquals("gateway-output", reply.getHeaders().get("flow.output.port"));
assertEquals("yeah!", reply.getHeaders().get("gateway"));
gatewayInput2.send(msg2);
reply = gatewayOutput2.receive();
assertNotNull(reply);
assertEquals("gateway-discard", reply.getHeaders().get("flow.output.port"));
assertEquals("yeah!", reply.getHeaders().get("gateway"));
gatewayInput2.send(msg1);
reply = gatewayOutput2.receive();
assertNotNull(reply);
assertEquals("gateway-output", reply.getHeaders().get("flow.output.port"));
assertEquals("yeah!", reply.getHeaders().get("gateway"));
}
@Autowired
@Qualifier("flowWithProps")
Flow flowWithProps;
@Test
public void testFlowWithInnerProps() {
assertEquals("val1",flowWithProps.getProperties().getProperty("key1"));
}
}

View File

@@ -18,9 +18,13 @@ package org.springframework.integration.flow.config.xml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.Iterator;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.flow.FlowConfiguration;
import org.springframework.integration.flow.PortConfiguration;
import org.springframework.test.context.ContextConfiguration;
@@ -35,10 +39,15 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration("classpath:/FlowConfigNamespaceTest-context.xml")
public class FlowConfigNamespaceTest {
@Autowired
FlowConfiguration flowConfiguration;
ApplicationContext applicationContext;
@Test
public void test() {
Map<String,FlowConfiguration> flowConfigurations = applicationContext.getBeansOfType(FlowConfiguration.class);
Iterator<FlowConfiguration> iterator = flowConfigurations.values().iterator();
FlowConfiguration flowConfiguration = iterator.next();
assertNotNull(flowConfiguration.getPortConfigurations());
assertEquals(2, flowConfiguration.getPortConfigurations().size());
PortConfiguration pc0 = flowConfiguration.getPortConfigurations().get(0);
@@ -47,6 +56,10 @@ public class FlowConfigNamespaceTest {
assertEquals("subflow-output", pc0.getOutputChannel("output"));
assertEquals(1, pc0.getOutputPortNames().size());
flowConfiguration = iterator.next();
assertNotNull(flowConfiguration.getPortConfigurations());
assertEquals (1, flowConfiguration.getPortConfigurations().size());
}
}

View File

@@ -1,28 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-flow="http://www.springframework.org/schema/integration/flow"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-flow="http://www.springframework.org/schema/integration/flow"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/flow http://www.springframework.org/schema/integration/flow/spring-integration-flow-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!-- Instantiate the flow (folded into its own application context) -->
<int-flow:flow id="subflow1" />
<int-flow:outbound-gateway flow="subflow1" input-channel="another-input"
output-channel="another-output" input-port="gateway-input"/>
<!-- Instantiate the flow (folded into its own application context) -->
<int-flow:flow id="subflow1" />
<int-flow:outbound-gateway flow="subflow1"
input-channel="inputC1" output-channel="outputC1" input-port="gateway-input" />
<int:chain input-channel="inputC2" output-channel="outputC2">
<int-flow:outbound-gateway flow="subflow1"
input-port="gateway-input" />
</int:chain>
<int:channel id="outputC1">
<int:queue />
</int:channel>
<int:channel id="outputC2">
<int:queue />
</int:channel>
<int:chain input-channel="another-input2" output-channel="another-output2">
<int-flow:outbound-gateway flow="subflow1" input-port="gateway-input"/>
</int:chain>
<int:channel id="another-output">
<int:queue />
</int:channel>
<int:channel id="another-output2">
<int:queue />
</int:channel>
<int-flow:flow id="flowWithProps" flow-id="subflow1">
<props>
<prop key="key1">val1</prop>
</props>
</int-flow:flow>
</beans>

View File

@@ -2,11 +2,12 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-flow="http://www.springframework.org/schema/integration/flow"
xsi:schemaLocation="http://www.springframework.org/schema/integration/flow http://www.springframework.org/schema/integration/flow/spring-integration-flow-2.0.xsd
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/integration/flow http://www.springframework.org/schema/integration/flow/spring-integration-flow-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<int-flow:flow-configuration>
<int-flow:port-mapping>
<int-flow:input-port name="input"
channel="subflow-input" />
@@ -32,4 +33,8 @@
<int-flow:output-port name="output2"
channel="subflow-output2" />
</int-flow:port-mapping>
<int-flow:flow-configuration>
<int-flow:port-mapping input-channel="inputChannel" output-channel="outputChannel"/>
</int-flow:flow-configuration>
</beans>

View File

@@ -1,24 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-flow="http://www.springframework.org/schema/integration/flow"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-flow="http://www.springframework.org/schema/integration/flow"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/flow http://www.springframework.org/schema/integration/flow/spring-integration-flow-2.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd">
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<util:properties id="flowProperties">
<prop key="key1">val1</prop>
</util:properties>
<!-- Instantiate the flow -->
<int-flow:flow id="subflow2" referenced-bean-locations="classpath:ref-bean-config.xml" properties="flowProperties"/>
<int-flow:flow id="subflow2" referenced-bean-locations="classpath:ref-bean-config.xml">
<props>
<prop key="key1">val1</prop>
</props>
</int-flow:flow>
<!-- input port not required if only one -->
<int-flow:outbound-gateway
flow="subflow2"
<int-flow:outbound-gateway flow="subflow2"
input-channel="inputC"
output-channel="outputC"/>

View File

@@ -10,9 +10,7 @@
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<int-flow:flow-configuration>
<int-flow:port-mapping>
<int-flow:input-port name="input" channel="flow-input"/>
</int-flow:port-mapping>
<int-flow:port-mapping input-channel="flow-input"/>
</int-flow:flow-configuration>
<int:bridge input-channel="flow-input" output-channel="nullChannel"/>

View File

@@ -10,10 +10,7 @@
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<int-flow:flow-configuration>
<int-flow:port-mapping>
<int-flow:input-port name="input" channel="flow-input"/>
<int-flow:output-port name="output" channel="flow-output"/>
</int-flow:port-mapping>
<int-flow:port-mapping input-channel="flow-input" output-channel="flow-output"/>
</int-flow:flow-configuration>
<int:filter input-channel="flow-input"

View File

@@ -15,10 +15,7 @@
<context:property-placeholder/>
<int-flow:flow-configuration>
<int-flow:port-mapping>
<int-flow:input-port name="input" channel="subflow-input" />
<int-flow:output-port name="output" channel="subflow-output" />
</int-flow:port-mapping>
<int-flow:port-mapping input-channel="subflow-input" output-channel="subflow-output"/>
</int-flow:flow-configuration>
<int:channel id="subflow-output" />

View File

@@ -12,11 +12,9 @@
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<int-flow:flow-configuration>
<int-flow:port-mapping>
<int-flow:input-port name="input" channel="subflow-input"/>
<int-flow:output-port name="output" channel="subflow-output"/>
</int-flow:port-mapping>
</int-flow:flow-configuration>
<int-flow:port-mapping input-channel="subflow-input" output-channel="subflow-output"/>
</int-flow:flow-configuration>
<!-- Throws exception -->