diff --git a/advanced/dynamic-tcp-client/src/main/java/org/springframework/integration/samples/dynamictcp/DynamicTcpClientApplication.java b/advanced/dynamic-tcp-client/src/main/java/org/springframework/integration/samples/dynamictcp/DynamicTcpClientApplication.java index 236bb320..d32ddd90 100644 --- a/advanced/dynamic-tcp-client/src/main/java/org/springframework/integration/samples/dynamictcp/DynamicTcpClientApplication.java +++ b/advanced/dynamic-tcp-client/src/main/java/org/springframework/integration/samples/dynamictcp/DynamicTcpClientApplication.java @@ -6,7 +6,6 @@ import java.util.LinkedHashMap; import java.util.Map.Entry; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @@ -17,6 +16,7 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableMessageHistory; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.dsl.context.IntegrationFlowRegistration; import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter; import org.springframework.integration.ip.tcp.TcpSendingMessageHandler; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; @@ -54,16 +54,16 @@ public class DynamicTcpClientApplication { @Bean public IntegrationFlow toTcp() { - return f -> f.route(router()); - } - - @Bean - public TcpRouter router() { - return new TcpRouter(); + return f -> f.route(new TcpRouter()); } // Two servers + @Bean + public TcpNetServerConnectionFactory cfOne() { + return new TcpNetServerConnectionFactory(1234); + } + @Bean public TcpReceivingChannelAdapter inOne(TcpNetServerConnectionFactory cfOne) { TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); @@ -73,8 +73,8 @@ public class DynamicTcpClientApplication { } @Bean - public TcpNetServerConnectionFactory cfOne() { - return new TcpNetServerConnectionFactory(1234); + public TcpNetServerConnectionFactory cfTwo() { + return new TcpNetServerConnectionFactory(5678); } @Bean @@ -85,11 +85,6 @@ public class DynamicTcpClientApplication { return adapter; } - @Bean - public TcpNetServerConnectionFactory cfTwo() { - return new TcpNetServerConnectionFactory(5678); - } - @Bean public QueueChannel outputChannel() { return new QueueChannel(); @@ -116,16 +111,13 @@ public class DynamicTcpClientApplication { }; - @Autowired - private ConfigurableApplicationContext context; - @Autowired private IntegrationFlowContext flowContext; @Override protected synchronized Collection determineTargetChannels(Message message) { MessageChannel channel = this.subFlows - .get("" + message.getHeaders().get("host") + message.getHeaders().get("port")); + .get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port")); if (channel == null) { channel = createNewSubflow(message); } @@ -139,22 +131,22 @@ public class DynamicTcpClientApplication { String hostPort = host + port; TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port); - this.context.getBeanFactory().registerSingleton(hostPort + ".cf", cf); - this.context.getBeanFactory().initializeBean(cf, hostPort+".cf"); - cf.afterPropertiesSet(); TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); handler.setConnectionFactory(cf); IntegrationFlow flow = f -> f.handle(handler); - this.flowContext.register(hostPort + ".flow", flow); - MessageChannel channel = this.flowContext.messagingTemplateFor(hostPort + ".flow").getDefaultDestination(); - this.subFlows.put(hostPort, channel); - return channel; + IntegrationFlowRegistration flowRegistration = + this.flowContext.registration(flow) + .addBean(cf) + .id(hostPort + ".flow") + .register(); + MessageChannel inputChannel = flowRegistration.getInputChannel(); + this.subFlows.put(hostPort, inputChannel); + return inputChannel; } private void removeSubFlow(Entry eldest) { String hostPort = eldest.getKey(); this.flowContext.remove(hostPort + ".flow"); - ((DefaultSingletonBeanRegistry) this.context.getBeanFactory()).destroySingleton(hostPort + ".cf"); } } diff --git a/build.gradle b/build.gradle index 7f72fbc8..f91ecbd9 100644 --- a/build.gradle +++ b/build.gradle @@ -197,7 +197,7 @@ subprojects { subproject -> subethasmtpVersion = '1.2' slf4jVersion = '1.7.11' springIntegrationVersion = '4.3.2.RELEASE' - springIntegrationDslVersion = '1.2.0.M2' + springIntegrationDslVersion = '1.2.0.RC1' springIntegrationKafkaVersion = '2.1.0.RELEASE' springIntegrationSplunkVersion = '1.1.0.RELEASE' springKafkaVersion = '1.1.0.RELEASE' @@ -1350,7 +1350,7 @@ project('kafka-dsl') { compile 'org.springframework.boot:spring-boot-starter-integration' compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" compile "org.springframework.integration:spring-integration-java-dsl:$springIntegrationDslVersion" - compile "org.springframework.kafka:spring-kafka:$springKafkaVersion" + compile ("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaVersion") compile ("org.springframework.kafka:spring-kafka-test:$springKafkaVersion") { exclude group: 'org.slf4j' } diff --git a/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java b/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java index 2d6473cb..a1e4ecff 100644 --- a/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java +++ b/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java @@ -38,7 +38,7 @@ import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; -import org.springframework.integration.dsl.kafka.Kafka09; +import org.springframework.integration.dsl.kafka.Kafka; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -130,7 +130,7 @@ public class Application { @Bean public IntegrationFlow toKafka() { return f -> f - .handle(Kafka09.outboundChannelAdapter(producerFactory()) + .handle(Kafka.outboundChannelAdapter(producerFactory()) .topic(this.topic) .messageKey(this.messageKey)); } @@ -150,7 +150,7 @@ public class Application { @Bean public IntegrationFlow fromKafka() { return IntegrationFlows - .from(Kafka09.messageDriverChannelAdapter(consumerFactory(), this.topic)) + .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), this.topic)) .channel(c -> c.queue("fromKafka")) .get(); }