GH-234: Add TCP Broadcast Sample

Resolves https://github.com/spring-projects/spring-integration-samples/issues/234

* Update top level README
This commit is contained in:
Gary Russell
2018-11-19 16:11:33 -05:00
committed by Artem Bilan
parent 2a88598aea
commit e7180b8725
7 changed files with 456 additions and 0 deletions

2
.gitignore vendored
View File

@@ -19,3 +19,5 @@ activemq-data
/.gradle
build/
/classes
.mvn
mvnw*

View File

@@ -68,6 +68,7 @@ This is a good place to get started. The samples here are technically motivated
* **quote** - Example demoing core EIP support using **Channel Adapter (Inbound and Stdout)**, **Poller** with Interval Trigers, **Service Activator**
* **sftp** - Demonstrating SFTP support using **SFTP Inbound / Outbound Channel Adapters**
* **tcp-amqp** - Demonstrates basic functionality of bridging the **Spring Integration TCP Adapters** with **Spring Integration AMQP Adapters**
* **tcp-broadcast** - Demonstrates broadcasting a message to multiple connected TCP clients.
* **tcp-client-server** - Demonstrates socket communication using **TcpOutboundGateway**, **TcpInboundGateway** and also uses a **Gateway** and a **Service Activator**
* **testing-examples** - A series of test cases that show techniques to **test** Spring Integration applications.
* **twitter** - Illustrates Twitter support using the **Twitter Inbound Channel Adapter**, **Twitter Inbound Search Channel Adapter**, **Twitter Outbound Channel Adapter**

View File

@@ -0,0 +1,24 @@
== TCP Sample
This sample demonstrates broadcasting a message received by a web controller to all connected TCP clients.
`curl -X POST http://localhost:8080/broadcast/foo` will send 'foo' to 5 connected clients.
The TCP server listens on port 1234.
`2018-11-19 12:01:48.546 INFO 98411 --- [ main] com.example.TcpBroadcastApplication : Started TcpBroadcastApplication in 1.904 seconds (JVM running for 2.345)`
`connected! from client# 5`
`connected! from client# 4`
`connected! from client# 3`
`connected! from client# 2`
`connected! from client# 1`
`curl -X POST http://localhost:8080/broadcast/foo`
`foo from client# 4`
`foo from client# 5`
`foo from client# 2`
`foo from client# 1`
`foo from client# 3`

205
basic/tcp-broadcast/pom.xml Normal file
View File

@@ -0,0 +1,205 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<groupId>org.springframework.integration.samples</groupId>
<artifactId>tcp-broadcast</artifactId>
<version>5.1.1.BUILD-SNAPSHOT</version>
<name>TCP Client Broadcast Sample</name>
<description>TCP Client Broadcast Sample</description>
<url>http://projects.spring.io/spring-integration</url>
<organization>
<name>SpringIO</name>
<url>https://spring.io</url>
</organization>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<developers>
<developer>
<id>garyrussell</id>
<name>Gary Russell</name>
<email>grussell@pivotal.io</email>
<roles>
<role>project lead</role>
</roles>
</developer>
<developer>
<id>markfisher</id>
<name>Mark Fisher</name>
<email>mfisher@pivotal.io</email>
<roles>
<role>project founder and lead emeritus</role>
</roles>
</developer>
<developer>
<id>ghillert</id>
<name>Gunnar Hillert</name>
<email>ghillert@pivotal.io</email>
</developer>
<developer>
<id>abilan</id>
<name>Artem Bilan</name>
<email>abilan@pivotal.io</email>
</developer>
</developers>
<scm>
<connection>scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git</connection>
<developerConnection>scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git</developerConnection>
<url>https://github.com/spring-projects/spring-integration-samples</url>
</scm>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
<exclusion>
<artifactId>*</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.18.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
<exclusion>
<artifactId>*</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<repository>
<id>repo.spring.io.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
<repository>
<id>repo.spring.io.snapshot</id>
<name>Spring Framework Maven Snapshot Repository</name>
<url>https://repo.spring.io/libs-snapshot</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId>
<version>5.1.2.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-bom</artifactId>
<version>5.1.0.RELEASE</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@@ -0,0 +1,200 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpbroadcast;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.stream.IntStream;
import javax.net.SocketFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.dsl.Tcp;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
public class TcpBroadcastApplication {
private static final int PORT = 1234;
@Configuration
public static class Config {
/*
* Server connection factory.
*/
@Bean
public AbstractServerConnectionFactory serverFactory() {
return Tcp.netServer(PORT).get();
}
/*
* Inbound adapter - sends "connected!".
*/
@Bean
public IntegrationFlow tcpServer(AbstractServerConnectionFactory serverFactory) {
return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory))
.transform(p -> "connected!")
.channel("toTcp.input")
.get();
}
/*
* Gateway flow for controller.
*/
@Bean
public IntegrationFlow gateway() {
return IntegrationFlows.from(Sender.class)
.channel("toTcp.input")
.get();
}
/*
* Outbound channel adapter flow.
*/
@Bean
public IntegrationFlow toTcp(AbstractServerConnectionFactory serverFactory) {
return f -> f.handle(Tcp.outboundAdapter(serverFactory));
}
/*
* Excutor for clients.
*/
@Bean
public ThreadPoolTaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(5);
return exec;
}
/*
* Start 5 clients.
*/
@Bean
public ApplicationRunner runner(TaskExecutor exec, Broadcaster caster) {
return args -> {
IntStream.range(1, 6).forEach(i -> exec.execute(new Client()));
};
}
}
/*
* Sender gateway sets the connection id header.
*/
public interface Sender {
void send(String payload, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}
@RestController
public static class Controller {
@Autowired
private Broadcaster broadcaster;
@Autowired
private ConfigurableApplicationContext applicationContext;
@PostMapping("/broadcast/{what}")
public String broadcast(@PathVariable String what) {
this.broadcaster.send(what);
return "sent: " + what;
}
@GetMapping("/shutdown")
public void shutDown() {
this.applicationContext.close();
}
}
@Component
@DependsOn("gateway") // Needed to ensure the gateway flow bean is created first
public static class Broadcaster {
@Autowired
private Sender sender;
@Autowired
private AbstractServerConnectionFactory server;
public void send(String what) {
this.server.getOpenConnectionIds().forEach(cid -> sender.send(what, cid));
}
}
public static class Client implements Runnable {
private static final ByteArrayCrLfSerializer deserializer = new ByteArrayCrLfSerializer();
private static int next;
private final int instance = ++next;
@Override
public void run() {
Socket socket = null;
try {
socket = SocketFactory.getDefault().createSocket("localhost", PORT);
socket.getOutputStream().write("hello\r\n".getBytes());
InputStream is = socket.getInputStream();
while(true) {
System.out.println(new String(deserializer.deserialize(is)) + " from client# " + instance);
}
}
catch (IOException e) {
}
finally {
if (socket != null) {
try {
socket.close();
}
catch (IOException e) {
}
}
}
}
}
public static void main(String[] args) {
SpringApplication.run(TcpBroadcastApplication.class, args);
}
}

View File

@@ -0,0 +1 @@
#logging.level.org.springframework.integration=debug

View File

@@ -902,6 +902,29 @@ project('tcp-amqp') {
}
}
project('tcp-broadcast') {
description = 'TCP Client Broadcast Sample'
apply plugin: 'org.springframework.boot'
dependencies {
compile 'org.springframework.boot:spring-boot-starter-web'
compile 'org.springframework.boot:spring-boot-starter-integration'
compile "org.springframework.integration:spring-integration-ip"
testCompile 'org.springframework.boot:spring-boot-starter-test'
}
bootRun {
main = 'org.springframework.integration.samples.tcpbroadcast.Application'
}
task run(type: JavaExec) {
main 'org.springframework.integration.samples.tcpbroadcast.Application'
classpath = sourceSets.main.runtimeClasspath
}
}
project('tcp-client-server') {
description = 'TCP Client Server Sample'