diff --git a/README b/README
index b8def02..ea81b66 100644
--- a/README
+++ b/README
@@ -1,66 +1,145 @@
+Spring Integration Flow
+-----------------------
+
+NOTE: If you want to get right to the code, see the unit tests and check out (literally :) the spring-integration-flow-samples project
+
Goals
---------
-This component is exploring mechanisms to encapsulate a referenced Spring Integration message flow as a component.
-A flow is a Spring Integration message flow intended for reuse. A flow is accessed via logical "ports" which map
-to internal channels.
+Spring Integration components already support common enterprise integration patterns. Sometimes it is desirable to
+create common message flows which implement higher level messaging patterns or "cross cutting concerns" for your
+application environment. This is especially true in more complex applications.
-A flow may expose multiple inputs and multiple outputs. A port mapping is defined for each input and has multiple
-outputs associated with it. See src/test/resources/META-INF/spring/integration/flows/subflow1/subflow1-context.xml, for example.
+Out of the box, a best practice is to define a common message flow in its own bean definition
+file which may be imported into other message flows. This approach however has some limitations:
+- It's input and output channels are referenced in every consuming flow. Each common flow must ensure unique channel names
+- True encapsuation is impossible as any internal channels and components are exposed to the consuming flow. Note that these must have
+unique names as well
+- Since a common flow is statically bound to channels, it cannot be used in a chain without implementing some type of service-activator/
+gateway wrapper.
+- Since the common flow is part of the same application context (an imported resource), is not practical to configure multiple instances with
+different property values, bean definitions, etc. (As of Spring 3.1 it may possible to do something with environment profiles, but would be overly complex)
-Generally, a message flow may behave like a router. For example, a flow may define
-a primary output and a discard output. Additionally, it may act like a delayer, providing no immediate response. Or it
-could act as an outbound channel adapter, providing no output.
+The spring-integration-flow module provides a way to implement and use common flows while addressing the above limitations. A flow is an
+abstraction of a Spring Integration component which is itself implemented with Spring Integration. In general, a flow may expose multiple inputs and multiple outputs,
+called ports. Each port binds to a channel internal to the flow. So a flow has input ports and output ports. The most common configuration is expected to define
+one input port and output port. Other likely configurations are one input and zero to a small number of outputs.
+
+A flow may behave like a router. For example, a flow may define a primary output port for normal processing and a discard port for exceptional cases.
+Alternately, it may act like a delayer, providing no immediate response. Or it could act as an outbound channel adapter - a strictly one way or fire-and-forget flow.
+
+The goal is to support these, and potentially other semantics while providing better encapsulation and configuration options. Configuration is provided via properties
+and or referenced bean definitions. Each flow is initialized in a child application context. This allows you to configure multiple instances of the same flow differently.
+The flow is not bound to input and output channels in the consumer context and may be naturally invoked via a flow outbound gateway. The outbound gateway may be used within
+a chain. It is also possible in theory to compose flows of other flows (NOTE: This hasn't been tested yet).
-The goal is to support these, and possibly other semantics. Additional goals are:
-- Encapsulation: the flow channels, and component should not be included the consumer application context. The consumer need
-not know how the flow is configured (i.e, it is contained in a jar).
-- Configuration: The flow may reference properties or beans to be provided by the consumer, e.g, a generic XML processor may require an OXM marshaller.
- - The flow should provide a description, in terms of ports, properties, and beans it exposes
- - It should be easy to implement or wrap an existing configuration as a flow and provide a better option than simple importing
- a spring configuration file and sending a message to one of its input channels
Usage
--------
-The flow consumer instantiates a flow and defines one or more flow outbound-gateways associated to an input port:
+--------
+The flow consumer instantiates a flow and configures one or more flow outbound-gateways.
+Instantiating a flow is very simple:
-The flow also supports a properties attribute and a referenced-bean-locations attribute used to inject properties and a list of configuration files respectively.
-Additionally any bean definition may be or property in the parent context may be inherited by the flow.
+The above bean definition instantiates a flow defined as "subflow1". The flow id references an existing flow implementation, presumably packaged in a
+separate jar. The flow element also provides optional 'properties' attribute and a 'referenced-bean-locations' attributes to inject properties and a list of
+bean definition locations respectively. Note that any bean definition or property in the parent context may also be referenced (inherited) by the flow. As
+an alternative to the properties attribute which references a util:properties bean, A properties object may be configured as an inner bean. The following are
+functionally equivalent:
+-----------------------------------------------------------
+
+
-An optional help attribute will output the flow's documentation to the STDOUT if set to 'true'.
+---------------------------------------------------------
+
+
+
+
+
+
+
-
-A message sent on the input-channel is delegated to the flow. The message on the output-channel is a response from one of the output
-ports. The output port name is contained in the response header 'flow.output.port'
+By default the id attribute is used as the flow-id.
-The input-port attribute is optional -- if the flow defines a single input port it will be mapped by default.
+An optional help attribute, if set to true will output the flow's description document (if there is one) to the STDOUT.
+The flow is invoked via an outbound-gateway:
+
+
+
+
+
+A message sent on the gateway's input-channel is delegated to the flow. The message on the output-channel is a response from one of the flows output
+ports. The output port name is contained in the response message header 'flow.output.port'
+
+NOTE: An optional input-port attribute is available if the flow defines multiple inputs, otherwise the input port it will be automatically mapped.
+
+Flows may also be used in a chain just as any AbstractReplyProducingMessageHandler:
+
+
+
+
+
+
-Flow Implementation
+Implementing a Flow
-------------------
-The flow element creates a Flow instance (eventually, configured with properties, referenced beans, etc.) The flow id is used to derive the flow's
-spring bean definition file by convention (classpath:META-INF/spring/flows/[flow-id]/*.xml). This bean definition file and any referenced bean locations
-will be used to create a child application context. The flow context must provide a FlowConfiguration containing the metadata describing the input ports,
-output ports, and referenced beans and properties.
+The flow element is used to locate the flow's spring bean definition file(s) by convention (classpath:META-INF/spring/flows/[flow-id]/*.xml). It's bean definition
+files and any referenced-bean-locations will be used to create a child application context. The flow context must provide a FlowConfiguration bean which defines the
+flows input and output ports and maps them to internal input and output channels.
-The flow implementer should also create a text file classpath:META-INF/spring/flows/[flow-id]/flow.doc which will contain the 'help' contents.
+Namespace support is provided for FlowConfiguration. In the simplest case, a flow implementation encapsulates a Spring Integration flow with a single input-channel and
+a single output-channel. The required configuration is simply declared:
-Currently, all defined outputs are bridged to a PublishSubscribeChannel which acts as a single 'flowOutputChannel'.
-Each flow outbound-gateway instance is backed by a FlowMessageHandler that bridges the 'flow output channel' to its own QueueChannel. This emulates
-a JMS topic. Each flow message handler sends the request message to the flow input channel corresponding to the input port and checks
-its queue for a response. A correlation id (flow conversation id) is used to correlate the response to the request.
+
+
+
-If the flow message handler catches an exception, it will convert it to an ErrorMessage response. Alternately, the flow can map its errorChannel to an output port
+Note: The output-channel attribute is optional if the flow does not produce a response.
-The response message contains a 'flow.output.port' header indicating which output port provided the response.
+For more complex scenarios, the flow configuration supports multiple port-mappings, each bound to a single input channel and 0 or more output channels. A configuration
+for specifying multiple outputs looks like:
-Currently flow input and output channels must inherit from SubscribableChannel, e.g., DirectChannel or PublishSubscribe channel.
\ No newline at end of file
+
+
+
+
+
+
+
+
+If the flow defines multiple inputs, then multiple port-mapping elements must be configured. Additionally the flow client must specify the input-port in the outbound-gateway.
+
+Flow Description File
+----------------------
+The user-friendly flow implementer may also create a text file classpath:META-INF/spring/flows/[flow-id]/flow.doc which describes the flow. Its contents will be written to
+STDOUT if the 'help' attribute on the client's flow declaration is set to true.
+
+FlowMessageHandler Internals
+------------------------------
+Currently, all defined outputs are automatically bridged to a PublishSubscribeChannel which acts as a single output channel for the flow. Each flow outbound-gateway instance
+is backed by a FlowMessageHandler that bridges the flow output channel to its own QueueChannel instance. This emulates a JMS topic. Each FlowMessageHandler subscribes to the
+flow. It sends the request message to the flows input channel mapped to the input port and checks its queue for a response. If there are multiple FlowMessageHandlers subscribed
+to the flow, each receive a response on the flow output channel, a correlation id (flow conversation id) is internally generated to correlate the response message to the request message.
+
+If the FlowMessageHandler catches an exception, it will convert it to an ErrorMessage response. Alternately, the flow can map its errorChannel to an output port
+
+Currently flow input and output channels must inherit from SubscribableChannel, e.g., DirectChannel or PublishSubscribe channel.
\ No newline at end of file
diff --git a/src/main/java/org/springframework/integration/flow/ChannelNamePortConfiguration.java b/src/main/java/org/springframework/integration/flow/ChannelNamePortConfiguration.java
index 1be7838..70ec359 100644
--- a/src/main/java/org/springframework/integration/flow/ChannelNamePortConfiguration.java
+++ b/src/main/java/org/springframework/integration/flow/ChannelNamePortConfiguration.java
@@ -19,54 +19,69 @@
package org.springframework.integration.flow;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class ChannelNamePortConfiguration implements PortConfiguration {
- private PortMetadata inputPortMetadata;
- private List outputPortMetadataList;
+ private PortMetadata inputPortMetadata;
+ private List outputPortMetadataList;
- public ChannelNamePortConfiguration(PortMetadata inputPortMetadata, List outputPortMetadataList) {
- this.outputPortMetadataList = outputPortMetadataList;
- this.inputPortMetadata = inputPortMetadata;
- }
+ public ChannelNamePortConfiguration(PortMetadata inputPortMetadata, List outputPortMetadataList) {
+ this.outputPortMetadataList = outputPortMetadataList;
+ this.inputPortMetadata = inputPortMetadata;
+ }
- @Override
- public String getInputPortName() {
- return this.inputPortMetadata.getPortName();
- }
+ public ChannelNamePortConfiguration(String inputChannelName, String outputChannelName) {
+ this.inputPortMetadata = new PortMetadata("input", inputChannelName);
- @Override
- public String getInputChannel() {
- return this.inputPortMetadata.getChannelName();
- }
-
-
- @Override
- public String getOutputChannel(String portName) {
- PortMetadata portMetadata = (PortMetadata) find(portName);
- if (portMetadata != null) {
- return portMetadata.getChannelName();
- }
- return null;
- }
-
- @Override
- public List getOutputPortNames() {
- List results = new ArrayList();
- for (PortMetadata portMetadata : outputPortMetadataList ) {
- results.add(portMetadata.getPortName());
- }
- return results;
- }
-
- public PortMetadata find(String portName){
- for (PortMetadata portMetadata : outputPortMetadataList ) {
- if (portName.equals(portMetadata.getPortName())){
- return portMetadata;
- }
+ if (outputChannelName != null) {
+ PortMetadata outputPortMetadata = new PortMetadata("output", outputChannelName);
+ this.outputPortMetadataList = Collections.singletonList(outputPortMetadata);
+ } else {
+ // this.outputPortMetadataList = new ArrayList();
}
- return null;
- }
+ }
+
+ @Override
+ public String getInputPortName() {
+ return this.inputPortMetadata.getPortName();
+ }
+
+ @Override
+ public String getInputChannel() {
+ return this.inputPortMetadata.getChannelName();
+ }
+
+ @Override
+ public String getOutputChannel(String portName) {
+ PortMetadata portMetadata = (PortMetadata) findOutputPort(portName);
+ if (portMetadata != null) {
+ return portMetadata.getChannelName();
+ }
+ return null;
+ }
+
+ @Override
+ public List getOutputPortNames() {
+ List results = new ArrayList();
+ if (outputPortMetadataList != null) {
+ for (PortMetadata portMetadata : outputPortMetadataList) {
+ results.add(portMetadata.getPortName());
+ }
+ }
+ return results;
+ }
+
+ private PortMetadata findOutputPort(String portName) {
+ if (outputPortMetadataList != null) {
+ for (PortMetadata portMetadata : outputPortMetadataList) {
+ if (portName.equals(portMetadata.getPortName())) {
+ return portMetadata;
+ }
+ }
+ }
+ return null;
+ }
}
diff --git a/src/main/java/org/springframework/integration/flow/Flow.java b/src/main/java/org/springframework/integration/flow/Flow.java
index c6b07ee..1ef9c8c 100644
--- a/src/main/java/org/springframework/integration/flow/Flow.java
+++ b/src/main/java/org/springframework/integration/flow/Flow.java
@@ -34,164 +34,171 @@ import org.springframework.util.StringUtils;
*/
public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, ApplicationContextAware {
- private static Log logger = LogFactory.getLog(Flow.class);
+ private static Log logger = LogFactory.getLog(Flow.class);
- private volatile ClassPathXmlApplicationContext flowContext;
-
- private ApplicationContext applicationContext;
+ private volatile ClassPathXmlApplicationContext flowContext;
- private volatile FlowConfiguration flowConfiguration;
+ private ApplicationContext applicationContext;
- private volatile String[] configLocations;
+ private volatile FlowConfiguration flowConfiguration;
- private volatile String[] referencedBeanLocations;
+ private volatile String[] configLocations;
- private volatile Properties flowProperties;
+ private volatile String[] referencedBeanLocations;
- private volatile String beanName;
-
- private volatile String flowId;
+ private volatile Properties flowProperties;
- private volatile ChannelResolver flowChannelResolver;
+ private volatile String beanName;
- private volatile PublishSubscribeChannel flowOutputChannel;
+ private volatile String flowId;
- private volatile boolean help;
+ private volatile ChannelResolver flowChannelResolver;
- public Flow() {
+ private volatile PublishSubscribeChannel flowOutputChannel;
- }
+ private volatile boolean help;
- public Flow(String[] configLocations) {
- this.configLocations = configLocations;
- }
+ public Flow() {
- @Override
- public void afterPropertiesSet() {
-
- if (this.flowId == null){
- this.flowId = this.beanName;
- }
-
- if (this.help) {
+ }
+
+ public Flow(Properties flowProperties, String[] configLocations) {
+ this.flowProperties = flowProperties;
+ this.configLocations = configLocations;
+ }
+
+ public Flow(String[] configLocations) {
+ this.configLocations = configLocations;
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+
+ if (this.flowId == null) {
+ this.flowId = this.beanName;
+ }
+
+ if (this.help) {
System.out.println(FlowUtils.getDocumentation(this.flowId));
}
- if (configLocations == null) {
- configLocations = new String[] { String.format(
- "classpath:META-INF/spring/integration/flows/%s/*.xml", this.flowId) };
- }
+ if (configLocations == null) {
+ configLocations = new String[] { String.format("classpath:META-INF/spring/integration/flows/%s/*.xml",
+ this.flowId) };
+ }
- if (referencedBeanLocations != null) {
- configLocations = (String[]) ArrayUtils.addAll(configLocations, referencedBeanLocations);
- }
+ if (referencedBeanLocations != null) {
+ configLocations = (String[]) ArrayUtils.addAll(configLocations, referencedBeanLocations);
+ }
- logger.debug("instantiating flow context from configLocations ["
- + StringUtils.arrayToCommaDelimitedString(configLocations) + "]");
+ logger.debug("instantiating flow context from configLocations ["
+ + StringUtils.arrayToCommaDelimitedString(configLocations) + "]");
- Assert.notEmpty(configLocations, "configLocations cannot be empty");
+ Assert.notEmpty(configLocations, "configLocations cannot be empty");
- flowContext = new ClassPathXmlApplicationContext(applicationContext);
+ flowContext = new ClassPathXmlApplicationContext(applicationContext);
+
+ addReferencedProperties();
- addReferencedProperties();
-
this.flowContext.setConfigLocations(configLocations);
-
- this.flowContext.refresh();
-
+
+ this.flowContext.refresh();
+
this.flowConfiguration = flowContext.getBean(FlowConfiguration.class);
- Assert.notNull(flowConfiguration, "flow context does not contain a flow configuration");
+ Assert.notNull(flowConfiguration, "flow context does not contain a flow configuration");
- validatePortMapping();
+ validatePortMapping();
- this.flowChannelResolver = new BeanFactoryChannelResolver(flowContext);
-
- bridgeMessagingPorts();
+ this.flowChannelResolver = new BeanFactoryChannelResolver(flowContext);
- }
+ bridgeMessagingPorts();
- public FlowConfiguration getFlowConfiguration() {
- return this.flowConfiguration;
- }
+ }
- @Override
- public void setBeanName(String name) {
- this.beanName = name;
+ public FlowConfiguration getFlowConfiguration() {
+ return this.flowConfiguration;
+ }
- }
+ @Override
+ public void setBeanName(String name) {
+ this.beanName = name;
- public String getBeanName() {
- return this.beanName;
- }
+ }
- public void setFlowId(String flowId) {
+ public String getBeanName() {
+ return this.beanName;
+ }
+
+ public void setFlowId(String flowId) {
this.flowId = flowId;
}
public void setReferencedBeanLocations(String[] referencedBeanLocations) {
- this.referencedBeanLocations = referencedBeanLocations;
- }
+ this.referencedBeanLocations = referencedBeanLocations;
+ }
- public void setProperties(Properties flowProperties) {
- this.flowProperties = flowProperties;
- }
+ public void setProperties(Properties flowProperties) {
+ this.flowProperties = flowProperties;
+ }
+
+ public Properties getProperties() {
+ return this.flowProperties;
+ }
- public void setHelp(boolean help) {
- this.help = help;
- }
+ public void setHelp(boolean help) {
+ this.help = help;
+ }
- public PublishSubscribeChannel getFlowOutputChannel() {
- return flowOutputChannel;
- }
+ public PublishSubscribeChannel getFlowOutputChannel() {
+ return flowOutputChannel;
+ }
- public void setFlowOutputChannel(PublishSubscribeChannel flowOutputChannel) {
- this.flowOutputChannel = flowOutputChannel;
- }
+ public void setFlowOutputChannel(PublishSubscribeChannel flowOutputChannel) {
+ this.flowOutputChannel = flowOutputChannel;
+ }
- @Override
- public MessageChannel resolveChannelName(String channelName) {
- return flowChannelResolver.resolveChannelName(channelName);
- }
+ @Override
+ public MessageChannel resolveChannelName(String channelName) {
+ return flowChannelResolver.resolveChannelName(channelName);
+ }
+ private void addReferencedProperties() {
+ if (flowProperties != null) {
+ PropertySource> propertySource = new PropertiesPropertySource("flowProperties", flowProperties);
- private void addReferencedProperties() {
- if (flowProperties != null) {
- PropertySource> propertySource = new PropertiesPropertySource("flowProperties", flowProperties);
-
- MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources();
- propertySources.addLast(propertySource);
- }
+ MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources();
+ propertySources.addLast(propertySource);
+ }
- }
+ }
- private void validatePortMapping() {
- Assert.notEmpty(this.flowConfiguration.getPortConfigurations(),
- "flow configuration contains no port configurations");
- }
+ private void validatePortMapping() {
+ Assert.notEmpty(this.flowConfiguration.getPortConfigurations(),
+ "flow configuration contains no port configurations");
+ }
- private void bridgeMessagingPorts() {
+ private void bridgeMessagingPorts() {
- /*
- * create a bridge for each target output port to the flow outputChannel
- */
- for (PortConfiguration targetPortConfiguration : this.getFlowConfiguration()
- .getPortConfigurations()) {
- for (String outputPort : targetPortConfiguration.getOutputPortNames()) {
- String targetOutputChannelName = (String) targetPortConfiguration.getOutputChannel(outputPort);
- SubscribableChannel inputChannel = (SubscribableChannel) resolveChannelName(targetOutputChannelName);
+ /*
+ * create a bridge for each target output port to the flow outputChannel
+ */
+ for (PortConfiguration targetPortConfiguration : this.getFlowConfiguration().getPortConfigurations()) {
+ for (String outputPort : targetPortConfiguration.getOutputPortNames()) {
+ String targetOutputChannelName = (String) targetPortConfiguration.getOutputChannel(outputPort);
+ SubscribableChannel inputChannel = (SubscribableChannel) resolveChannelName(targetOutputChannelName);
- ((AbstractMessageChannel)inputChannel).addInterceptor(new FlowInterceptor(outputPort));
+ ((AbstractMessageChannel) inputChannel).addInterceptor(new FlowInterceptor(outputPort));
- logger.debug("creating output bridge on [" + outputPort + "] inputChannelName = ["
- + targetOutputChannelName + "] outputChannel = [" + this.flowOutputChannel + "]");
- FlowUtils.bridgeChannels(inputChannel, this.flowOutputChannel);
- }
- }
- }
+ logger.debug("creating output bridge on [" + outputPort + "] inputChannelName = ["
+ + targetOutputChannelName + "] outputChannel = [" + this.flowOutputChannel + "]");
+ FlowUtils.bridgeChannels(inputChannel, this.flowOutputChannel);
+ }
+ }
+ }
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
-
+ this.applicationContext = applicationContext;
+
}
}
diff --git a/src/main/java/org/springframework/integration/flow/config/xml/FlowConfigurationParser.java b/src/main/java/org/springframework/integration/flow/config/xml/FlowConfigurationParser.java
index 12b463b..97e377b 100644
--- a/src/main/java/org/springframework/integration/flow/config/xml/FlowConfigurationParser.java
+++ b/src/main/java/org/springframework/integration/flow/config/xml/FlowConfigurationParser.java
@@ -15,6 +15,8 @@
*/
package org.springframework.integration.flow.config.xml;
+import groovy.sql.OutParameter;
+
import java.util.List;
import org.springframework.beans.factory.config.BeanDefinition;
@@ -23,6 +25,7 @@ import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.BeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.flow.ChannelNamePortConfiguration;
import org.springframework.integration.flow.FlowConfiguration;
import org.springframework.integration.flow.PortMetadata;
@@ -48,9 +51,36 @@ public class FlowConfigurationParser implements BeanDefinitionParser {
ManagedList