Upgrade to SI-Java_DSL-1.2.RC1

This commit is contained in:
Artem Bilan
2016-09-27 13:45:14 -04:00
parent 2e103f6f34
commit 62894391b5
3 changed files with 23 additions and 31 deletions

View File

@@ -6,7 +6,6 @@ import java.util.LinkedHashMap;
import java.util.Map.Entry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@@ -17,6 +16,7 @@ import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableMessageHistory;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.dsl.context.IntegrationFlowRegistration;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
@@ -54,16 +54,16 @@ public class DynamicTcpClientApplication {
@Bean
public IntegrationFlow toTcp() {
return f -> f.route(router());
}
@Bean
public TcpRouter router() {
return new TcpRouter();
return f -> f.route(new TcpRouter());
}
// Two servers
@Bean
public TcpNetServerConnectionFactory cfOne() {
return new TcpNetServerConnectionFactory(1234);
}
@Bean
public TcpReceivingChannelAdapter inOne(TcpNetServerConnectionFactory cfOne) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
@@ -73,8 +73,8 @@ public class DynamicTcpClientApplication {
}
@Bean
public TcpNetServerConnectionFactory cfOne() {
return new TcpNetServerConnectionFactory(1234);
public TcpNetServerConnectionFactory cfTwo() {
return new TcpNetServerConnectionFactory(5678);
}
@Bean
@@ -85,11 +85,6 @@ public class DynamicTcpClientApplication {
return adapter;
}
@Bean
public TcpNetServerConnectionFactory cfTwo() {
return new TcpNetServerConnectionFactory(5678);
}
@Bean
public QueueChannel outputChannel() {
return new QueueChannel();
@@ -116,16 +111,13 @@ public class DynamicTcpClientApplication {
};
@Autowired
private ConfigurableApplicationContext context;
@Autowired
private IntegrationFlowContext flowContext;
@Override
protected synchronized Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageChannel channel = this.subFlows
.get("" + message.getHeaders().get("host") + message.getHeaders().get("port"));
.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));
if (channel == null) {
channel = createNewSubflow(message);
}
@@ -139,22 +131,22 @@ public class DynamicTcpClientApplication {
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
this.context.getBeanFactory().registerSingleton(hostPort + ".cf", cf);
this.context.getBeanFactory().initializeBean(cf, hostPort+".cf");
cf.afterPropertiesSet();
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(handler);
this.flowContext.register(hostPort + ".flow", flow);
MessageChannel channel = this.flowContext.messagingTemplateFor(hostPort + ".flow").getDefaultDestination();
this.subFlows.put(hostPort, channel);
return channel;
IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
private void removeSubFlow(Entry<String, MessageChannel> eldest) {
String hostPort = eldest.getKey();
this.flowContext.remove(hostPort + ".flow");
((DefaultSingletonBeanRegistry) this.context.getBeanFactory()).destroySingleton(hostPort + ".cf");
}
}

View File

@@ -197,7 +197,7 @@ subprojects { subproject ->
subethasmtpVersion = '1.2'
slf4jVersion = '1.7.11'
springIntegrationVersion = '4.3.2.RELEASE'
springIntegrationDslVersion = '1.2.0.M2'
springIntegrationDslVersion = '1.2.0.RC1'
springIntegrationKafkaVersion = '2.1.0.RELEASE'
springIntegrationSplunkVersion = '1.1.0.RELEASE'
springKafkaVersion = '1.1.0.RELEASE'
@@ -1350,7 +1350,7 @@ project('kafka-dsl') {
compile 'org.springframework.boot:spring-boot-starter-integration'
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
compile "org.springframework.integration:spring-integration-java-dsl:$springIntegrationDslVersion"
compile "org.springframework.kafka:spring-kafka:$springKafkaVersion"
compile ("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaVersion")
compile ("org.springframework.kafka:spring-kafka-test:$springKafkaVersion") {
exclude group: 'org.slf4j'
}

View File

@@ -38,7 +38,7 @@ import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.kafka.Kafka09;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -130,7 +130,7 @@ public class Application {
@Bean
public IntegrationFlow toKafka() {
return f -> f
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.handle(Kafka.outboundChannelAdapter(producerFactory())
.topic(this.topic)
.messageKey(this.messageKey));
}
@@ -150,7 +150,7 @@ public class Application {
@Bean
public IntegrationFlow fromKafka() {
return IntegrationFlows
.from(Kafka09.messageDriverChannelAdapter(consumerFactory(), this.topic))
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), this.topic))
.channel(c -> c.queue("fromKafka"))
.get();
}