Add TCP Bi-Directional Async Sample

* Add a second client and address PR comments.
This commit is contained in:
Gary Russell
2020-06-02 16:13:25 -04:00
committed by GitHub
parent 18f3fcf912
commit 8a83eb23dd
9 changed files with 561 additions and 2 deletions

View File

@@ -1301,6 +1301,34 @@ project('stored-procedures-postgresql') {
}
}
project('tcp-async-bi-directional') {
description = 'Bi-Directional TCP Sample'
apply plugin: 'org.springframework.boot'
dependencies {
compile 'org.springframework.boot:spring-boot-starter-integration'
compile "org.springframework.integration:spring-integration-ip"
testCompile 'org.springframework.boot:spring-boot-starter-test'
testCompile "org.springframework.integration:spring-integration-test"
}
bootRun {
main = 'org.springframework.integration.samples.tcpasyncbi.TcpAsyncBiDirectionalApplication'
}
task run(type: JavaExec) {
main 'org.springframework.integration.samples.tcpasyncbi.TcpAsyncBiDirectionalApplication'
classpath = sourceSets.main.runtimeClasspath
}
test {
useJUnitPlatform()
}
}
project('tcp-client-server-multiplex') {
description = 'TCP Client Server Multiplexing Sample'

View File

@@ -1,3 +1,3 @@
version=5.3.0.BUILD-SNAPSHOT
springBootVersion=2.3.0.BUILD-SNAPSHOT
version=5.3.0.RELEASE
springBootVersion=2.3.0.RELEASE
org.gradle.jvmargs=-Xmx1536m

View File

@@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/

View File

@@ -0,0 +1,6 @@
TCP Async Bi-Directional Sample
==================================
If this is your first experience with the spring-integration-ip module, start with the **tcp-client-server** project in the basic folder.
This sample demonstrates asynchronous, arbitrary bi-directional communication between 2 peers.

View File

@@ -0,0 +1,215 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
</parent>
<groupId>org.springframework.integration.samples</groupId>
<artifactId>tcp-async-bi-directional</artifactId>
<version>5.3.0.RELEASE</version>
<name>Bi-Directional TCP Sample</name>
<description>Bi-Directional TCP Sample</description>
<url>https://projects.spring.io/spring-integration</url>
<organization>
<name>SpringIO</name>
<url>https://spring.io</url>
</organization>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<developers>
<developer>
<id>abilan</id>
<name>Artem Bilan</name>
<email>abilan@pivotal.io</email>
<roles>
<role>project lead</role>
</roles>
</developer>
<developer>
<id>garyrussell</id>
<name>Gary Russell</name>
<email>grussell@pivotal.io</email>
<roles>
<role>lead emeritus</role>
</roles>
</developer>
<developer>
<id>markfisher</id>
<name>Mark Fisher</name>
<email>mfisher@pivotal.io</email>
<roles>
<role>project founder and lead emeritus</role>
</roles>
</developer>
<developer>
<id>ghillert</id>
<name>Gunnar Hillert</name>
<email>ghillert@pivotal.io</email>
</developer>
</developers>
<scm>
<connection>scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git</connection>
<developerConnection>scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git</developerConnection>
<url>https://github.com/spring-projects/spring-integration-samples</url>
</scm>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
<exclusion>
<artifactId>*</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.2.4</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
<exclusion>
<artifactId>*</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<repository>
<id>repo.spring.io.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
<repository>
<id>repo.spring.io.snapshot</id>
<name>Spring Framework Maven Snapshot Repository</name>
<url>https://repo.spring.io/libs-snapshot</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.0.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<version>2.10.3</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId>
<version>5.2.6.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-bom</artifactId>
<version>5.3.0.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@@ -0,0 +1,41 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpasyncbi;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Properties for the sample.
*
* @author Gary Russell
* @since 5.3
*
*/
@ConfigurationProperties("tcp")
public class SampleProperties {
private int serverPort;
public int getServerPort() {
return this.serverPort;
}
public void setServerPort(int serverPort) {
this.serverPort = serverPort;
}
}

View File

@@ -0,0 +1,146 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpasyncbi;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
/**
* Demonstrates independent bi-directional communication between peers.
* One side opens the connection and they proceed to send messages to each
* other on different schedules.
* There are two client instances.
*
* @author Gary Russell
* @since 5.3
*
*/
@SpringBootApplication
@EnableConfigurationProperties(SampleProperties.class)
public class TcpAsyncBiDirectionalApplication {
public static void main(String[] args) {
SpringApplication.run(TcpAsyncBiDirectionalApplication.class, args);
}
}
@Configuration
class ClientPeer {
@Bean
public AbstractClientConnectionFactory client1(SampleProperties properties) {
return Tcp.netClient("localhost", properties.getServerPort()).get();
}
@Bean
public IntegrationFlow client1Out(AbstractClientConnectionFactory client1) {
return IntegrationFlows.from(() -> "Hello from client1", e -> e.id("client1Adapter")
.poller(Pollers.fixedDelay(3000)))
.handle(Tcp.outboundAdapter(client1))
.get();
}
@Bean
public IntegrationFlow client1In(AbstractClientConnectionFactory client1) {
return IntegrationFlows.from(Tcp.inboundAdapter(client1))
.transform(Transformers.objectToString())
.log(msg -> "client1: " + msg.getPayload())
.get();
}
@Bean
public AbstractClientConnectionFactory client2(SampleProperties properties) {
return Tcp.netClient("localhost", properties.getServerPort()).get();
}
@Bean
public IntegrationFlow client2Out(AbstractClientConnectionFactory client2) {
return IntegrationFlows.from(() -> "Hello from client2", e -> e.id("client2Adapter")
.poller(Pollers.fixedDelay(2000)))
.handle(Tcp.outboundAdapter(client2))
.get();
}
@Bean
public IntegrationFlow client2In(AbstractClientConnectionFactory client2) {
return IntegrationFlows.from(Tcp.inboundAdapter(client2))
.transform(Transformers.objectToString())
.log(msg -> "client2: " + msg.getPayload())
.get();
}
}
@Configuration
class ServerPeer {
private final Set<String> clients = ConcurrentHashMap.newKeySet();
@Bean
public AbstractServerConnectionFactory server(SampleProperties properties) {
return Tcp.netServer(properties.getServerPort()).get();
}
@Bean
public IntegrationFlow serverIn(AbstractServerConnectionFactory server) {
return IntegrationFlows.from(Tcp.inboundAdapter(server))
.transform(Transformers.objectToString())
.log(msg -> "server: " + msg.getPayload())
.get();
}
@Bean
public IntegrationFlow serverOut(AbstractServerConnectionFactory server) {
return IntegrationFlows.from(() -> "seed", e -> e.poller(Pollers.fixedDelay(5000)))
.split(this.clients, "iterator")
.enrichHeaders(h -> h.headerExpression(IpHeaders.CONNECTION_ID, "payload"))
.transform(p -> "Hello from server")
.handle(Tcp.outboundAdapter(server))
.get();
}
@EventListener
public void open(TcpConnectionOpenEvent event) {
if (event.getConnectionFactoryName().equals("server")) {
this.clients.add(event.getConnectionId());
}
}
@EventListener
public void close(TcpConnectionCloseEvent event) {
this.clients.remove(event.getConnectionId());
}
}

View File

@@ -0,0 +1,5 @@
tcp:
server-port: 1234
#logging:
# level:
# org.springframework.integration: debug

View File

@@ -0,0 +1,87 @@
package org.springframework.integration.samples.tcpasyncbi;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.test.context.SpringIntegrationTest;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
@SpringBootTest
@SpringIntegrationTest(noAutoStartup = { "client1Adapter", "client2Adapter" })
class TcpAsyncBiDirectionalApplicationTests {
@Autowired
@Qualifier("client1Adapter")
private SourcePollingChannelAdapter adapter1;
@Autowired
@Qualifier("client2Adapter")
private SourcePollingChannelAdapter adapter2;
@Autowired
@Qualifier("client1In.channel#0")
private AbstractMessageChannel client1In;
@Autowired
@Qualifier("client2In.channel#0")
private AbstractMessageChannel client2In;
@Autowired
@Qualifier("serverIn.channel#0")
private AbstractMessageChannel serverIn;
@Test
void testBothReceive() throws InterruptedException {
CountDownLatch serverLatch = new CountDownLatch(1);
CountDownLatch client1Latch = new CountDownLatch(1);
CountDownLatch client2Latch = new CountDownLatch(1);
this.serverIn.addInterceptor(new ChannelInterceptor() {
@Override
@Nullable
public Message<?> preSend(Message<?> message, MessageChannel channel) {
serverLatch.countDown();
return message;
}
});
this.client1In.addInterceptor(new ChannelInterceptor() {
@Override
@Nullable
public Message<?> preSend(Message<?> message, MessageChannel channel) {
client1Latch.countDown();
return message;
}
});
this.client2In.addInterceptor(new ChannelInterceptor() {
@Override
@Nullable
public Message<?> preSend(Message<?> message, MessageChannel channel) {
client2Latch.countDown();
return message;
}
});
this.adapter1.start();
this.adapter2.start();
assertThat(serverLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(client1Latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(client2Latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}