From 07b8311521b06b561d2c613dd393963cd5b1081b Mon Sep 17 00:00:00 2001 From: David Turanski Date: Thu, 7 Jul 2011 19:49:57 -0400 Subject: [PATCH] Flow context is now a child of the main context --- README | 36 +++++++++----- .../integration/flow/Flow.java | 49 ++++++++++--------- .../config/FlowMessageHandlerFactoryBean.java | 33 ++++++------- .../integration/flow/config/FlowUtils.java | 41 +++++++--------- src/main/resources/META-INF/MANIFEST.MF | 3 ++ .../integration/flow/FlowUtilsTest.java | 37 ++++++++++++++ 6 files changed, 123 insertions(+), 76 deletions(-) create mode 100644 src/main/resources/META-INF/MANIFEST.MF create mode 100644 src/test/java/org/springframework/integration/flow/FlowUtilsTest.java diff --git a/README b/README index 208b58a..945f332 100644 --- a/README +++ b/README @@ -22,7 +22,8 @@ or required bean definitions provided by the consumer (e.g, a generic XML proces Usage ------- -The flow consumer instantiates a flow and defines one or more flow outbound gateways for each input port: +The flow consumer instantiates a flow and defines one or more flow outbound-gateways associated to an input port: + @@ -31,14 +32,27 @@ The flow consumer instantiates a flow and defines one or more flow outbound gate A message sent on the input-channel is delegated to the flow. The message on the output-channel is a response from one of the output ports. The output port name is contained in the response header 'flow.output.port' - -Implementation ----------------- -The flow element creates a Flow instance (eventually, configured with properties, referenced beans, etc.) The flow id is used to derive the flow's -spring bean definition file by convention. This bean definition file will be used to create a standalone application context which must provide a -FlowConfiguration containing the metadata about the exposed ports, beans, and properties. -Currently, all exposed outputs are bridged to a PublishSubscribeChannel which acts as a single 'flowOutputChannel'. -Each flow outbound gateway instance is backed by a FlowMessageHandler that bridges the 'flow output channel to its own QueueChannel. This is -analogous to a JMS topic. Each flow message handler sends the request message to the flow input channel corresponding to the input port and checks -its queue for a response. A correlation id is used to match the response to the request. \ No newline at end of file +The input-port attribute is optional -- if the flow defines a single input port it will be mapped by default. + +The flow also supports a properties attribute and a referenced-bean-locations attribute used to inject properties and a list of configuration files respectively. +Additionally any bean definition may be or property in the parent context may be inherited by the flow. + + +Flow Implementation +------------------- +The flow element creates a Flow instance (eventually, configured with properties, referenced beans, etc.) The flow id is used to derive the flow's +spring bean definition file by convention (META-INF/spring/flows/[flow-id]-context.xml). This bean definition file and any referenced bean locations +will be used to create a child application context. The flow context must provide a FlowConfiguration containing the metadata describing the input ports, +output ports, and referenced beans and properties. + +Currently, all defined outputs are bridged to a PublishSubscribeChannel which acts as a single 'flowOutputChannel'. +Each flow outbound-gateway instance is backed by a FlowMessageHandler that bridges the 'flow output channel' to its own QueueChannel. This emulates +a JMS topic. Each flow message handler sends the request message to the flow input channel corresponding to the input port and checks +its queue for a response. A correlation id (flow conversation id) is used to correlate the response to the request. + +If the flow message handler catches an exception, it will convert it to an ErrorMessage response. Alternately, the flow can map its errorChannel to an output port + +The response message contains a 'flow.output.port' header indicating which output port provided the response. + +Currently flow input and output channels must inherit from SubscribableChannel, e.g., DirectChannel or PublishSubscribe channel. \ No newline at end of file diff --git a/src/main/java/org/springframework/integration/flow/Flow.java b/src/main/java/org/springframework/integration/flow/Flow.java index d0cbfb0..ad98889 100644 --- a/src/main/java/org/springframework/integration/flow/Flow.java +++ b/src/main/java/org/springframework/integration/flow/Flow.java @@ -8,9 +8,8 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; -import org.springframework.beans.factory.support.BeanDefinitionRegistry; -import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.core.env.MutablePropertySources; @@ -19,6 +18,7 @@ import org.springframework.core.env.PropertySource; import org.springframework.integration.MessageChannel; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.core.SubscribableChannel; import org.springframework.integration.flow.config.FlowUtils; import org.springframework.integration.flow.interceptor.FlowInterceptor; import org.springframework.integration.support.channel.BeanFactoryChannelResolver; @@ -33,11 +33,13 @@ import org.springframework.util.StringUtils; * @author David Turanski * */ -public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, BeanDefinitionRegistryPostProcessor { +public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, ApplicationContextAware { - protected Log logger = LogFactory.getLog(getClass()); + private static Log logger = LogFactory.getLog(Flow.class); private volatile ConfigurableApplicationContext flowContext; + + private ConfigurableApplicationContext applicationContext; private volatile FlowConfiguration flowConfiguration; @@ -80,12 +82,12 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B Assert.notEmpty(configLocations, "configLocations cannot be empty"); - flowContext = new ClassPathXmlApplicationContext(configLocations); + flowContext = new ClassPathXmlApplicationContext(configLocations, applicationContext); this.flowConfiguration = flowContext.getBean(FlowConfiguration.class); Assert.notNull(flowConfiguration, "flow context does not contain a flow configuration"); - if (help) { + if (this.help) { System.out.println(displayFlowConfiguration()); } @@ -94,6 +96,10 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B this.flowChannelResolver = new BeanFactoryChannelResolver(flowContext); addReferencedProperties(); + + bridgeMessagingPorts(); + + } @@ -136,17 +142,7 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B return flowChannelResolver.resolveChannelName(channelName); } - @Override - public void postProcessBeanFactory(ConfigurableListableBeanFactory arg0) throws BeansException { - // TODO Auto-generated method stub - - } - - @Override - public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { - bridgeMessagingPorts(registry); - - } + /** * @@ -200,7 +196,8 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B } MutablePropertySources propertySources = flowContext.getEnvironment().getPropertySources(); propertySources.addLast(propertySource); - flowContext.refresh(); + + this.flowContext.refresh(); } } @@ -210,7 +207,7 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B "flow configuration contains no port configurations"); } - private void bridgeMessagingPorts(BeanDefinitionRegistry registry) { + private void bridgeMessagingPorts() { /* * create a bridge for each target output port to the flow outputChannel @@ -219,14 +216,20 @@ public class Flow implements InitializingBean, BeanNameAware, ChannelResolver, B .getPortConfigurations()) { for (String outputPort : targetPortConfiguration.getOutputPortNames()) { String targetOutputChannelName = (String) targetPortConfiguration.getOutputChannel(outputPort); - AbstractMessageChannel inputChannel = (AbstractMessageChannel) resolveChannelName(targetOutputChannelName); + SubscribableChannel inputChannel = (SubscribableChannel) resolveChannelName(targetOutputChannelName); - inputChannel.addInterceptor(new FlowInterceptor(outputPort)); + ((AbstractMessageChannel)inputChannel).addInterceptor(new FlowInterceptor(outputPort)); logger.debug("creating output bridge on [" + outputPort + "] inputChannelName = [" + targetOutputChannelName + "] outputChannel = [" + this.flowOutputChannel + "]"); - FlowUtils.createBridge(inputChannel, this.flowOutputChannel, registry); + FlowUtils.bridgeChannels(inputChannel, this.flowOutputChannel); } } } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (ConfigurableApplicationContext) applicationContext; + + } } diff --git a/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java b/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java index 0967f7e..edbe1e4 100644 --- a/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java +++ b/src/main/java/org/springframework/integration/flow/config/FlowMessageHandlerFactoryBean.java @@ -15,11 +15,9 @@ */ package org.springframework.integration.flow.config; -import org.springframework.beans.BeansException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; -import org.springframework.beans.factory.support.BeanDefinitionRegistry; -import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; import org.springframework.integration.MessageChannel; import org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean; import org.springframework.integration.core.MessageHandler; @@ -35,8 +33,10 @@ import org.springframework.util.Assert; * */ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerFactoryBean implements - InitializingBean, BeanDefinitionRegistryPostProcessor { + InitializingBean { + private static Log logger = LogFactory.getLog(FlowMessageHandlerFactoryBean.class); + private volatile Flow flow; private volatile String inputPortName; @@ -73,22 +73,14 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF this.flowReceiveChannel = flowOutputChannel; } - @Override - public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { - // TODO Auto-generated method stub - - } - - @Override - public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { - bridgeMessagingPorts(registry); - - } - + - private void bridgeMessagingPorts(BeanDefinitionRegistry registry) { - FlowUtils.createBridge(this.flow.getFlowOutputChannel(), this.flowReceiveChannel, registry); + private void bridgeMessagingPorts( ) { + logger.debug("creating handler bridge on inputChannelName = [" + + flow.getFlowOutputChannel() + "] outputChannel = [" + this.flowReceiveChannel + "]"); + + FlowUtils.bridgeChannels(this.flow.getFlowOutputChannel(), this.flowReceiveChannel); } @Override @@ -107,6 +99,9 @@ public class FlowMessageHandlerFactoryBean extends AbstractSimpleMessageHandlerF } Assert.notEmpty(this.flowConfiguration.getOutputPortNames(), "flow [" + this.flow.getBeanName() + "] has no configured output ports"); + + + bridgeMessagingPorts(); } } diff --git a/src/main/java/org/springframework/integration/flow/config/FlowUtils.java b/src/main/java/org/springframework/integration/flow/config/FlowUtils.java index bc704e4..eda7f5a 100644 --- a/src/main/java/org/springframework/integration/flow/config/FlowUtils.java +++ b/src/main/java/org/springframework/integration/flow/config/FlowUtils.java @@ -14,12 +14,10 @@ package org.springframework.integration.flow.config; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.config.BeanDefinition; -import org.springframework.beans.factory.support.AbstractBeanDefinition; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.integration.MessageChannel; -import org.springframework.integration.config.ConsumerEndpointFactoryBean; +import org.springframework.integration.core.SubscribableChannel; import org.springframework.integration.handler.BridgeHandler; /** @@ -27,28 +25,25 @@ import org.springframework.integration.handler.BridgeHandler; * */ public class FlowUtils { - /* - * create a bridge + /** + * Create a bridge + * @param inputChannel + * @param outputChannel */ - public static void createBridge(MessageChannel inputChannel, MessageChannel outputChannel, - BeanDefinitionRegistry registry) { - - BeanDefinitionBuilder handlerBuilder = BeanDefinitionBuilder - .genericBeanDefinition(BridgeHandler.class); - - handlerBuilder.addPropertyValue("outputChannel", outputChannel); - - AbstractBeanDefinition handlerBeanDefinition = handlerBuilder.getBeanDefinition(); - - BeanDefinitionBuilder consumerEndpointBuilder = BeanDefinitionBuilder - .genericBeanDefinition(ConsumerEndpointFactoryBean.class); - - String handlerBeanName = registerBeanDefinition(handlerBeanDefinition, registry); - consumerEndpointBuilder.addPropertyReference("handler", handlerBeanName); - consumerEndpointBuilder.addPropertyValue("inputChannel", inputChannel); - registerBeanDefinition(consumerEndpointBuilder.getBeanDefinition(), registry); - } + + public static void bridgeChannels(SubscribableChannel inputChannel, MessageChannel outputChannel) { + BridgeHandler bridgeHandler = new BridgeHandler(); + bridgeHandler.setOutputChannel(outputChannel); + inputChannel.subscribe(bridgeHandler); + } + + /** + * Register a bean with "flow" prefix + * @param beanDefinition + * @param registry + * @return + */ public static String registerBeanDefinition(BeanDefinition beanDefinition, BeanDefinitionRegistry registry){ String beanName = BeanDefinitionReaderUtils.generateBeanName(beanDefinition, registry); beanName = "flow."+ beanName; diff --git a/src/main/resources/META-INF/MANIFEST.MF b/src/main/resources/META-INF/MANIFEST.MF new file mode 100644 index 0000000..5e94951 --- /dev/null +++ b/src/main/resources/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Class-Path: + diff --git a/src/test/java/org/springframework/integration/flow/FlowUtilsTest.java b/src/test/java/org/springframework/integration/flow/FlowUtilsTest.java new file mode 100644 index 0000000..c289052 --- /dev/null +++ b/src/test/java/org/springframework/integration/flow/FlowUtilsTest.java @@ -0,0 +1,37 @@ +package org.springframework.integration.flow; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +import org.junit.Test; +import org.springframework.integration.Message; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.core.PollableChannel; +import org.springframework.integration.core.SubscribableChannel; +import org.springframework.integration.flow.config.FlowUtils; +import org.springframework.integration.message.GenericMessage; + +public class FlowUtilsTest { + @Test + public void buildBridge(){ + + + SubscribableChannel inputChannel = new DirectChannel(); + SubscribableChannel outputChannel = new PublishSubscribeChannel(); + PollableChannel receiveChannel = new QueueChannel(); + + FlowUtils.bridgeChannels(inputChannel, outputChannel); + FlowUtils.bridgeChannels(outputChannel, receiveChannel); + + Message message = new GenericMessage("hello"); + + inputChannel.send(message); + Message result = receiveChannel.receive(100); + assertNotNull(result); + assertSame(message, result); + + } + +}