Register references to child context instead of the bindable proxies

Fix #819
This commit is contained in:
Marius Bogoevici
2017-02-21 11:32:30 -05:00
committed by Soby Chacko
parent 02fdd7c9dd
commit e46d7ede73
3 changed files with 31 additions and 7 deletions

View File

@@ -42,8 +42,8 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Marius Bogoevici
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = AggregateTestWithBean.ChainedProcessors.class, properties = {"server.port=-1"})
public class AggregateTestWithBean {
@SpringBootTest(classes = AggregateWithBeanTest.ChainedProcessors.class, properties = {"server.port=-1"})
public class AggregateWithBeanTest {
@Autowired
public MessageCollector messageCollector;

View File

@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.aggregate.AggregateApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
@@ -37,7 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
public class AggregateTestWithMain {
public class AggregateWithMainTest {
@Test
public void testAggregateApplication() throws InterruptedException {
@@ -60,6 +61,10 @@ public class AggregateTestWithMain {
@EnableBinding(Processor.class)
public static class UppercaseProcessor {
@Autowired
Processor processor;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String in) {
return in.toUpperCase();

View File

@@ -48,6 +48,7 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.PropertySources;
import org.springframework.integration.monitor.IntegrationMBeanExporter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
@@ -59,7 +60,10 @@ import org.springframework.util.StringUtils;
* @author Venil Noronha
*/
@EnableBinding
public class AggregateApplicationBuilder implements AggregateApplication, ApplicationContextAware, SmartInitializingSingleton {
public class AggregateApplicationBuilder implements AggregateApplication, ApplicationContextAware,
SmartInitializingSingleton {
private static final String CHILD_CONTEXT_SUFFIX = ".spring.cloud.stream.context";
private SourceConfigurer sourceConfigurer;
@@ -153,7 +157,9 @@ public class AggregateApplicationBuilder implements AggregateApplication, Applic
throw new IllegalStateException("The aggregate application has not been started yet");
}
try {
return bindableType.cast(parentContext.getBean(namespace + "." + bindableType.getName()));
ChildContextHolder contextHolder = parentContext.getBean(namespace + CHILD_CONTEXT_SUFFIX,
ChildContextHolder.class);
return contextHolder.getChildContext().getBean(bindableType);
}
catch (BeansException e) {
throw new IllegalStateException("Binding not found for '" + bindableType.getName() + "' into namespace " +
@@ -383,8 +389,7 @@ public class AggregateApplicationBuilder implements AggregateApplication, Applic
for (String bindableProxyName : bindableProxies.keySet()) {
try {
AggregateApplicationBuilder.this.parentContext.getBeanFactory().registerSingleton(
this.getNamespace() + "." + bindableProxyName.substring(1, bindableProxyName.length()),
bindableProxies.get(bindableProxyName).getObject());
this.getNamespace() + CHILD_CONTEXT_SUFFIX, new ChildContextHolder(childContext));
}
catch (Exception e) {
throw new IllegalStateException(
@@ -462,6 +467,20 @@ public class AggregateApplicationBuilder implements AggregateApplication, Applic
}
private static class ChildContextHolder {
private final ConfigurableApplicationContext childContext;
ChildContextHolder(ConfigurableApplicationContext childContext) {
Assert.notNull(childContext, "cannot be null");
this.childContext = childContext;
}
public ConfigurableApplicationContext getChildContext() {
return childContext;
}
}
@ImportAutoConfiguration({ChannelBindingAutoConfiguration.class, EndpointAutoConfiguration.class})
@EnableBinding
public static class ParentConfiguration {