Flow context is now a child of the main context

This commit is contained in:
David Turanski
2011-07-07 19:49:57 -04:00
parent 46d390b503
commit 07b8311521
6 changed files with 123 additions and 76 deletions

36
README
View File

@@ -22,7 +22,8 @@ or required bean definitions provided by the consumer (e.g, a generic XML proces
Usage
-------
The flow consumer instantiates a flow and defines one or more flow outbound gateways for each input port:
The flow consumer instantiates a flow and defines one or more flow outbound-gateways associated to an input port:
<int-flow:flow id="subflow1"/>
@@ -31,14 +32,27 @@ The flow consumer instantiates a flow and defines one or more flow outbound gate
A message sent on the input-channel is delegated to the flow. The message on the output-channel is a response from one of the output
ports. The output port name is contained in the response header 'flow.output.port'
Implementation
----------------
The flow element creates a Flow instance (eventually, configured with properties, referenced beans, etc.) The flow id is used to derive the flow's
spring bean definition file by convention. This bean definition file will be used to create a standalone application context which must provide a
FlowConfiguration containing the metadata about the exposed ports, beans, and properties.
Currently, all exposed outputs are bridged to a PublishSubscribeChannel which acts as a single 'flowOutputChannel'.
Each flow outbound gateway instance is backed by a FlowMessageHandler that bridges the 'flow output channel to its own QueueChannel. This is
analogous to a JMS topic. Each flow message handler sends the request message to the flow input channel corresponding to the input port and checks
its queue for a response. A correlation id is used to match the response to the request.
The input-port attribute is optional -- if the flow defines a single input port it will be mapped by default.
The flow also supports a properties attribute and a referenced-bean-locations attribute used to inject properties and a list of configuration files respectively.
Additionally any bean definition may be or property in the parent context may be inherited by the flow.
Flow Implementation
-------------------
The flow element creates a Flow instance (eventually, configured with properties, referenced beans, etc.) The flow id is used to derive the flow's
spring bean definition file by convention (META-INF/spring/flows/[flow-id]-context.xml). This bean definition file and any referenced bean locations
will be used to create a child application context. The flow context must provide a FlowConfiguration containing the metadata describing the input ports,
output ports, and referenced beans and properties.
Currently, all defined outputs are bridged to a PublishSubscribeChannel which acts as a single 'flowOutputChannel'.
Each flow outbound-gateway instance is backed by a FlowMessageHandler that bridges the 'flow output channel' to its own QueueChannel. This emulates
a JMS topic. Each flow message handler sends the request message to the flow input channel corresponding to the input port and checks
its queue for a response. A correlation id (flow conversation id) is used to correlate the response to the request.
If the flow message handler catches an exception, it will convert it to an ErrorMessage response. Alternately, the flow can map its errorChannel to an output port
The response message contains a 'flow.output.port' header indicating which output port provided the response.
Currently flow input and output channels must inherit from SubscribableChannel, e.g., DirectChannel or PublishSubscribe channel.

View File

@@ -8,9 +8,8 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.env.MutablePropertySources;
@@ -19,6 +18,7 @@ import org.springframework.core.env.PropertySource;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.flow.config.FlowUtils;
import org.springframework.integration.flow.interceptor.FlowInterceptor;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
@@ -33,11 +33,13 @@ import org.springframework.util.StringUtils;
* @author David Turanski
*
*/
public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, BeanDefinitionRegistryPostProcessor {
public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, ApplicationContextAware {
protected Log logger = LogFactory.getLog(getClass());
private static Log logger = LogFactory.getLog(Flow.class);
private volatile ConfigurableApplicationContext flowContext;
private ConfigurableApplicationContext applicationContext;
private volatile FlowConfiguration flowConfiguration;
@@ -80,12 +82,12 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B
Assert.notEmpty(configLocations, "configLocations cannot be empty");
flowContext = new ClassPathXmlApplicationContext(configLocations);
flowContext = new ClassPathXmlApplicationContext(configLocations, applicationContext);
this.flowConfiguration = flowContext.getBean(FlowConfiguration.class);
Assert.notNull(flowConfiguration, "flow context does not contain a flow configuration");
if (help) {
if (this.help) {
System.out.println(displayFlowConfiguration());
}
@@ -94,6 +96,10 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B
this.flowChannelResolver = new BeanFactoryChannelResolver(flowContext);
addReferencedProperties();
bridgeMessagingPorts();
}
@@ -136,17 +142,7 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B
return flowChannelResolver.resolveChannelName(channelName);
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory arg0) throws BeansException {
// TODO Auto-generated method stub
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
bridgeMessagingPorts(registry);
}
/**
*
@@ -200,7 +196,8 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B
}
MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources();
propertySources.addLast(propertySource);
flowContext.refresh();
this.flowContext.refresh();
}
}
@@ -210,7 +207,7 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B
"flow configuration contains no port configurations");
}
private void bridgeMessagingPorts(BeanDefinitionRegistry registry) {
private void bridgeMessagingPorts() {
/*
* create a bridge for each target output port to the flow outputChannel
@@ -219,14 +216,20 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B
.getPortConfigurations()) {
for (String outputPort : targetPortConfiguration.getOutputPortNames()) {
String targetOutputChannelName = (String) targetPortConfiguration.getOutputChannel(outputPort);
AbstractMessageChannel inputChannel = (AbstractMessageChannel) resolveChannelName(targetOutputChannelName);
SubscribableChannel inputChannel = (SubscribableChannel) resolveChannelName(targetOutputChannelName);
inputChannel.addInterceptor(new FlowInterceptor(outputPort));
((AbstractMessageChannel)inputChannel).addInterceptor(new FlowInterceptor(outputPort));
logger.debug("creating output bridge on [" + outputPort + "] inputChannelName = ["
+ targetOutputChannelName + "] outputChannel = [" + this.flowOutputChannel + "]");
FlowUtils.createBridge(inputChannel, this.flowOutputChannel, registry);
FlowUtils.bridgeChannels(inputChannel, this.flowOutputChannel);
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
}

View File

@@ -15,11 +15,9 @@
*/
package org.springframework.integration.flow.config;
import org.springframework.beans.BeansException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean;
import org.springframework.integration.core.MessageHandler;
@@ -35,8 +33,10 @@ import org.springframework.util.Assert;
*
*/
public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerFactoryBean implements
InitializingBean, BeanDefinitionRegistryPostProcessor {
InitializingBean {
private static Log logger = LogFactory.getLog(FlowMessageHandlerFactoryBean.class);
private volatile Flow flow;
private volatile String inputPortName;
@@ -73,22 +73,14 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF
this.flowReceiveChannel = flowOutputChannel;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
// TODO Auto-generated method stub
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
bridgeMessagingPorts(registry);
}
private void bridgeMessagingPorts(BeanDefinitionRegistry registry) {
FlowUtils.createBridge(this.flow.getFlowOutputChannel(), this.flowReceiveChannel, registry);
private void bridgeMessagingPorts( ) {
logger.debug("creating handler bridge on inputChannelName = ["
+ flow.getFlowOutputChannel() + "] outputChannel = [" + this.flowReceiveChannel + "]");
FlowUtils.bridgeChannels(this.flow.getFlowOutputChannel(), this.flowReceiveChannel);
}
@Override
@@ -107,6 +99,9 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF
}
Assert.notEmpty(this.flowConfiguration.getOutputPortNames(), "flow [" + this.flow.getBeanName()
+ "] has no configured output ports");
bridgeMessagingPorts();
}
}

View File

@@ -14,12 +14,10 @@ package org.springframework.integration.flow.config;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.handler.BridgeHandler;
/**
@@ -27,28 +25,25 @@ import org.springframework.integration.handler.BridgeHandler;
*
*/
public class FlowUtils {
/*
* create a bridge
/**
* Create a bridge
* @param inputChannel
* @param outputChannel
*/
public static void createBridge(MessageChannel inputChannel, MessageChannel outputChannel,
BeanDefinitionRegistry registry) {
BeanDefinitionBuilder handlerBuilder = BeanDefinitionBuilder
.genericBeanDefinition(BridgeHandler.class);
handlerBuilder.addPropertyValue("outputChannel", outputChannel);
AbstractBeanDefinition handlerBeanDefinition = handlerBuilder.getBeanDefinition();
BeanDefinitionBuilder consumerEndpointBuilder = BeanDefinitionBuilder
.genericBeanDefinition(ConsumerEndpointFactoryBean.class);
String handlerBeanName = registerBeanDefinition(handlerBeanDefinition, registry);
consumerEndpointBuilder.addPropertyReference("handler", handlerBeanName);
consumerEndpointBuilder.addPropertyValue("inputChannel", inputChannel);
registerBeanDefinition(consumerEndpointBuilder.getBeanDefinition(), registry);
}
public static void bridgeChannels(SubscribableChannel inputChannel, MessageChannel outputChannel) {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannel(outputChannel);
inputChannel.subscribe(bridgeHandler);
}
/**
* Register a bean with "flow" prefix
* @param beanDefinition
* @param registry
* @return
*/
public static String registerBeanDefinition(BeanDefinition beanDefinition, BeanDefinitionRegistry registry){
String beanName = BeanDefinitionReaderUtils.generateBeanName(beanDefinition, registry);
beanName = "flow."+ beanName;

View File

@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Class-Path:

View File

@@ -0,0 +1,37 @@
package org.springframework.integration.flow;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import org.junit.Test;
import org.springframework.integration.Message;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.PollableChannel;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.flow.config.FlowUtils;
import org.springframework.integration.message.GenericMessage;
public class FlowUtilsTest {
@Test
public void buildBridge(){
SubscribableChannel inputChannel = new DirectChannel();
SubscribableChannel outputChannel = new PublishSubscribeChannel();
PollableChannel receiveChannel = new QueueChannel();
FlowUtils.bridgeChannels(inputChannel, outputChannel);
FlowUtils.bridgeChannels(outputChannel, receiveChannel);
Message<?> message = new GenericMessage<String>("hello");
inputChannel.send(message);
Message<?> result = receiveChannel.receive(100);
assertNotNull(result);
assertSame(message, result);
}
}