AbstractEndpointParser creates a DirectChannel for an 'input-channel' that is not explicitly configured within the context.
This commit is contained in:
@@ -18,10 +18,12 @@ package org.springframework.integration.config;
|
||||
|
||||
import org.w3c.dom.Element;
|
||||
|
||||
import org.springframework.beans.factory.config.BeanDefinitionHolder;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
|
||||
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser;
|
||||
import org.springframework.beans.factory.xml.ParserContext;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.endpoint.config.ConsumerEndpointFactoryBean;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
@@ -97,6 +99,11 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio
|
||||
String inputChannelAttributeName = this.getInputChannelAttributeName();
|
||||
String inputChannelName = element.getAttribute(inputChannelAttributeName);
|
||||
Assert.hasText(inputChannelName, "the '" + inputChannelAttributeName + "' attribute is required");
|
||||
if (!parserContext.getRegistry().containsBeanDefinition(inputChannelName)) {
|
||||
BeanDefinitionBuilder channelDef = BeanDefinitionBuilder.genericBeanDefinition(DirectChannel.class);
|
||||
BeanDefinitionHolder holder = new BeanDefinitionHolder(channelDef.getBeanDefinition(), inputChannelName);
|
||||
BeanDefinitionReaderUtils.registerBeanDefinition(holder, parserContext.getRegistry());
|
||||
}
|
||||
builder.addPropertyValue("inputChannelName", inputChannelName);
|
||||
Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT);
|
||||
if (pollerElement != null) {
|
||||
|
||||
@@ -53,7 +53,7 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon
|
||||
this.outputChannel = outputChannel;
|
||||
}
|
||||
|
||||
public MessageChannel getOutputChannel() {
|
||||
protected MessageChannel getOutputChannel() {
|
||||
return this.outputChannel;
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.integration.channel.ChannelRegistry;
|
||||
import org.springframework.integration.channel.ChannelRegistryAware;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.channel.SubscribableChannel;
|
||||
@@ -141,37 +140,31 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, ChannelRegistry
|
||||
if (this.initialized) {
|
||||
return;
|
||||
}
|
||||
if (!this.beanFactory.containsBean(this.inputChannelName)) {
|
||||
DirectChannel channel = new DirectChannel();
|
||||
channel.setBeanName(this.inputChannelName);
|
||||
this.beanFactory.registerSingleton(this.inputChannelName, channel);
|
||||
this.endpoint = new SubscribingConsumerEndpoint(this.consumer, channel);
|
||||
Assert.isTrue(this.beanFactory.containsBean(this.inputChannelName),
|
||||
"no such input channel '" + this.inputChannelName + "'");
|
||||
MessageChannel channel = (MessageChannel)
|
||||
this.beanFactory.getBean(this.inputChannelName, MessageChannel.class);
|
||||
if (channel instanceof SubscribableChannel) {
|
||||
this.endpoint = new SubscribingConsumerEndpoint(
|
||||
this.consumer, (SubscribableChannel) channel);
|
||||
}
|
||||
else if (channel instanceof PollableChannel) {
|
||||
if (this.trigger == null) {
|
||||
this.trigger = new IntervalTrigger(0);
|
||||
}
|
||||
PollingConsumerEndpoint pollingEndpoint = new PollingConsumerEndpoint(
|
||||
this.consumer, (PollableChannel) channel);
|
||||
pollingEndpoint.setTrigger(this.trigger);
|
||||
pollingEndpoint.setMaxMessagesPerPoll(this.maxMessagesPerPoll);
|
||||
pollingEndpoint.setReceiveTimeout(this.receiveTimeout);
|
||||
pollingEndpoint.setTaskExecutor(this.taskExecutor);
|
||||
pollingEndpoint.setTransactionManager(this.transactionManager);
|
||||
pollingEndpoint.setTransactionDefinition(this.transactionDefinition);
|
||||
this.endpoint = pollingEndpoint;
|
||||
}
|
||||
else {
|
||||
MessageChannel channel = (MessageChannel)
|
||||
this.beanFactory.getBean(this.inputChannelName, MessageChannel.class);
|
||||
if (channel instanceof SubscribableChannel) {
|
||||
this.endpoint = new SubscribingConsumerEndpoint(
|
||||
this.consumer, (SubscribableChannel) channel);
|
||||
}
|
||||
else if (channel instanceof PollableChannel) {
|
||||
if (this.trigger == null) {
|
||||
this.trigger = new IntervalTrigger(0);
|
||||
}
|
||||
PollingConsumerEndpoint pollingEndpoint = new PollingConsumerEndpoint(
|
||||
this.consumer, (PollableChannel) channel);
|
||||
pollingEndpoint.setTrigger(this.trigger);
|
||||
pollingEndpoint.setMaxMessagesPerPoll(this.maxMessagesPerPoll);
|
||||
pollingEndpoint.setReceiveTimeout(this.receiveTimeout);
|
||||
pollingEndpoint.setTaskExecutor(this.taskExecutor);
|
||||
pollingEndpoint.setTransactionManager(this.transactionManager);
|
||||
pollingEndpoint.setTransactionDefinition(this.transactionDefinition);
|
||||
this.endpoint = pollingEndpoint;
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException(
|
||||
"unsupported channel type: [" + channel.getClass() + "]");
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
"unsupported channel type: [" + channel.getClass() + "]");
|
||||
}
|
||||
this.initialized = true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user