diff --git a/build.gradle b/build.gradle index e549a087..620155be 100644 --- a/build.gradle +++ b/build.gradle @@ -1301,6 +1301,34 @@ project('stored-procedures-postgresql') { } } +project('tcp-async-bi-directional') { + description = 'Bi-Directional TCP Sample' + + apply plugin: 'org.springframework.boot' + + dependencies { + compile 'org.springframework.boot:spring-boot-starter-integration' + compile "org.springframework.integration:spring-integration-ip" + + testCompile 'org.springframework.boot:spring-boot-starter-test' + testCompile "org.springframework.integration:spring-integration-test" + } + + bootRun { + main = 'org.springframework.integration.samples.tcpasyncbi.TcpAsyncBiDirectionalApplication' + } + + task run(type: JavaExec) { + main 'org.springframework.integration.samples.tcpasyncbi.TcpAsyncBiDirectionalApplication' + classpath = sourceSets.main.runtimeClasspath + } + + test { + useJUnitPlatform() + } + +} + project('tcp-client-server-multiplex') { description = 'TCP Client Server Multiplexing Sample' diff --git a/gradle.properties b/gradle.properties index e88ed83f..16bc4d5b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=5.3.0.BUILD-SNAPSHOT -springBootVersion=2.3.0.BUILD-SNAPSHOT +version=5.3.0.RELEASE +springBootVersion=2.3.0.RELEASE org.gradle.jvmargs=-Xmx1536m diff --git a/intermediate/tcp-async-bi-directional/.gitignore b/intermediate/tcp-async-bi-directional/.gitignore new file mode 100644 index 00000000..a2a3040a --- /dev/null +++ b/intermediate/tcp-async-bi-directional/.gitignore @@ -0,0 +1,31 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + +### VS Code ### +.vscode/ diff --git a/intermediate/tcp-async-bi-directional/README.md b/intermediate/tcp-async-bi-directional/README.md new file mode 100644 index 00000000..7f7fd059 --- /dev/null +++ b/intermediate/tcp-async-bi-directional/README.md @@ -0,0 +1,6 @@ +TCP Async Bi-Directional Sample +================================== + +If this is your first experience with the spring-integration-ip module, start with the **tcp-client-server** project in the basic folder. + +This sample demonstrates asynchronous, arbitrary bi-directional communication between 2 peers. diff --git a/intermediate/tcp-async-bi-directional/pom.xml b/intermediate/tcp-async-bi-directional/pom.xml new file mode 100644 index 00000000..b7045807 --- /dev/null +++ b/intermediate/tcp-async-bi-directional/pom.xml @@ -0,0 +1,215 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.3.0.RELEASE + + org.springframework.integration.samples + tcp-async-bi-directional + 5.3.0.RELEASE + Bi-Directional TCP Sample + Bi-Directional TCP Sample + https://projects.spring.io/spring-integration + + SpringIO + https://spring.io + + + + The Apache Software License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + abilan + Artem Bilan + abilan@pivotal.io + + project lead + + + + garyrussell + Gary Russell + grussell@pivotal.io + + lead emeritus + + + + markfisher + Mark Fisher + mfisher@pivotal.io + + project founder and lead emeritus + + + + ghillert + Gunnar Hillert + ghillert@pivotal.io + + + + scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git + scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git + https://github.com/spring-projects/spring-integration-samples + + + + org.springframework.boot + spring-boot-starter-integration + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-ip + compile + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + junit + junit + 4.12 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + * + org.hamcrest + + + + + org.hamcrest + hamcrest-all + 1.3 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.mockito + mockito-core + 3.2.4 + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + * + org.hamcrest + + + + + org.springframework + spring-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.boot + spring-boot-starter-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + org.springframework.integration + spring-integration-test + test + + + jackson-module-kotlin + com.fasterxml.jackson.module + + + + + + + repo.spring.io.milestone + Spring Framework Maven Milestone Repository + https://repo.spring.io/libs-milestone + + + repo.spring.io.snapshot + Spring Framework Maven Snapshot Repository + https://repo.spring.io/libs-snapshot + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + org.springframework.boot + spring-boot-dependencies + 2.3.0.RELEASE + import + pom + + + com.fasterxml.jackson + jackson-bom + 2.10.3 + import + pom + + + org.springframework + spring-framework-bom + 5.2.6.RELEASE + import + pom + + + org.springframework.integration + spring-integration-bom + 5.3.0.RELEASE + import + pom + + + + diff --git a/intermediate/tcp-async-bi-directional/src/main/java/org/springframework/integration/samples/tcpasyncbi/SampleProperties.java b/intermediate/tcp-async-bi-directional/src/main/java/org/springframework/integration/samples/tcpasyncbi/SampleProperties.java new file mode 100644 index 00000000..2224e6ca --- /dev/null +++ b/intermediate/tcp-async-bi-directional/src/main/java/org/springframework/integration/samples/tcpasyncbi/SampleProperties.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.tcpasyncbi; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Properties for the sample. + * + * @author Gary Russell + * @since 5.3 + * + */ +@ConfigurationProperties("tcp") +public class SampleProperties { + + private int serverPort; + + public int getServerPort() { + return this.serverPort; + } + + public void setServerPort(int serverPort) { + this.serverPort = serverPort; + } + +} diff --git a/intermediate/tcp-async-bi-directional/src/main/java/org/springframework/integration/samples/tcpasyncbi/TcpAsyncBiDirectionalApplication.java b/intermediate/tcp-async-bi-directional/src/main/java/org/springframework/integration/samples/tcpasyncbi/TcpAsyncBiDirectionalApplication.java new file mode 100644 index 00000000..44cfbe71 --- /dev/null +++ b/intermediate/tcp-async-bi-directional/src/main/java/org/springframework/integration/samples/tcpasyncbi/TcpAsyncBiDirectionalApplication.java @@ -0,0 +1,146 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.tcpasyncbi; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.dsl.Transformers; +import org.springframework.integration.ip.IpHeaders; +import org.springframework.integration.ip.dsl.Tcp; +import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent; +import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent; + +/** + * Demonstrates independent bi-directional communication between peers. + * One side opens the connection and they proceed to send messages to each + * other on different schedules. + * There are two client instances. + * + * @author Gary Russell + * @since 5.3 + * + */ +@SpringBootApplication +@EnableConfigurationProperties(SampleProperties.class) +public class TcpAsyncBiDirectionalApplication { + + public static void main(String[] args) { + SpringApplication.run(TcpAsyncBiDirectionalApplication.class, args); + } + +} + +@Configuration +class ClientPeer { + + @Bean + public AbstractClientConnectionFactory client1(SampleProperties properties) { + return Tcp.netClient("localhost", properties.getServerPort()).get(); + } + + @Bean + public IntegrationFlow client1Out(AbstractClientConnectionFactory client1) { + return IntegrationFlows.from(() -> "Hello from client1", e -> e.id("client1Adapter") + .poller(Pollers.fixedDelay(3000))) + .handle(Tcp.outboundAdapter(client1)) + .get(); + } + + @Bean + public IntegrationFlow client1In(AbstractClientConnectionFactory client1) { + return IntegrationFlows.from(Tcp.inboundAdapter(client1)) + .transform(Transformers.objectToString()) + .log(msg -> "client1: " + msg.getPayload()) + .get(); + } + + @Bean + public AbstractClientConnectionFactory client2(SampleProperties properties) { + return Tcp.netClient("localhost", properties.getServerPort()).get(); + } + + @Bean + public IntegrationFlow client2Out(AbstractClientConnectionFactory client2) { + return IntegrationFlows.from(() -> "Hello from client2", e -> e.id("client2Adapter") + .poller(Pollers.fixedDelay(2000))) + .handle(Tcp.outboundAdapter(client2)) + .get(); + } + + @Bean + public IntegrationFlow client2In(AbstractClientConnectionFactory client2) { + return IntegrationFlows.from(Tcp.inboundAdapter(client2)) + .transform(Transformers.objectToString()) + .log(msg -> "client2: " + msg.getPayload()) + .get(); + } + +} + +@Configuration +class ServerPeer { + + private final Set clients = ConcurrentHashMap.newKeySet(); + + @Bean + public AbstractServerConnectionFactory server(SampleProperties properties) { + return Tcp.netServer(properties.getServerPort()).get(); + } + + @Bean + public IntegrationFlow serverIn(AbstractServerConnectionFactory server) { + return IntegrationFlows.from(Tcp.inboundAdapter(server)) + .transform(Transformers.objectToString()) + .log(msg -> "server: " + msg.getPayload()) + .get(); + } + + @Bean + public IntegrationFlow serverOut(AbstractServerConnectionFactory server) { + return IntegrationFlows.from(() -> "seed", e -> e.poller(Pollers.fixedDelay(5000))) + .split(this.clients, "iterator") + .enrichHeaders(h -> h.headerExpression(IpHeaders.CONNECTION_ID, "payload")) + .transform(p -> "Hello from server") + .handle(Tcp.outboundAdapter(server)) + .get(); + } + + @EventListener + public void open(TcpConnectionOpenEvent event) { + if (event.getConnectionFactoryName().equals("server")) { + this.clients.add(event.getConnectionId()); + } + } + + @EventListener + public void close(TcpConnectionCloseEvent event) { + this.clients.remove(event.getConnectionId()); + } + +} diff --git a/intermediate/tcp-async-bi-directional/src/main/resources/application.yml b/intermediate/tcp-async-bi-directional/src/main/resources/application.yml new file mode 100644 index 00000000..71da3633 --- /dev/null +++ b/intermediate/tcp-async-bi-directional/src/main/resources/application.yml @@ -0,0 +1,5 @@ +tcp: + server-port: 1234 +#logging: +# level: +# org.springframework.integration: debug diff --git a/intermediate/tcp-async-bi-directional/src/test/java/org/springframework/integration/samples/tcpasyncbi/TcpAsyncBiDirectionalApplicationTests.java b/intermediate/tcp-async-bi-directional/src/test/java/org/springframework/integration/samples/tcpasyncbi/TcpAsyncBiDirectionalApplicationTests.java new file mode 100644 index 00000000..d08fe95b --- /dev/null +++ b/intermediate/tcp-async-bi-directional/src/test/java/org/springframework/integration/samples/tcpasyncbi/TcpAsyncBiDirectionalApplicationTests.java @@ -0,0 +1,87 @@ +package org.springframework.integration.samples.tcpasyncbi; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.integration.channel.AbstractMessageChannel; +import org.springframework.integration.endpoint.SourcePollingChannelAdapter; +import org.springframework.integration.test.context.SpringIntegrationTest; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.ChannelInterceptor; + +@SpringBootTest +@SpringIntegrationTest(noAutoStartup = { "client1Adapter", "client2Adapter" }) +class TcpAsyncBiDirectionalApplicationTests { + + @Autowired + @Qualifier("client1Adapter") + private SourcePollingChannelAdapter adapter1; + + @Autowired + @Qualifier("client2Adapter") + private SourcePollingChannelAdapter adapter2; + + @Autowired + @Qualifier("client1In.channel#0") + private AbstractMessageChannel client1In; + + @Autowired + @Qualifier("client2In.channel#0") + private AbstractMessageChannel client2In; + + @Autowired + @Qualifier("serverIn.channel#0") + private AbstractMessageChannel serverIn; + + @Test + void testBothReceive() throws InterruptedException { + CountDownLatch serverLatch = new CountDownLatch(1); + CountDownLatch client1Latch = new CountDownLatch(1); + CountDownLatch client2Latch = new CountDownLatch(1); + this.serverIn.addInterceptor(new ChannelInterceptor() { + + @Override + @Nullable + public Message preSend(Message message, MessageChannel channel) { + serverLatch.countDown(); + return message; + } + + }); + this.client1In.addInterceptor(new ChannelInterceptor() { + + @Override + @Nullable + public Message preSend(Message message, MessageChannel channel) { + client1Latch.countDown(); + return message; + } + + }); + this.client2In.addInterceptor(new ChannelInterceptor() { + + @Override + @Nullable + public Message preSend(Message message, MessageChannel channel) { + client2Latch.countDown(); + return message; + } + + }); + this.adapter1.start(); + this.adapter2.start(); + assertThat(serverLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(client1Latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(client2Latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + +}