INTSAMPLES-14 Tcp Collaborating Adapters Sample

Add a sample showing how to use collaborating channel adapters
for high performance.
This commit is contained in:
Gary Russell
2011-12-21 22:01:39 -05:00
committed by Gunnar Hillert
parent d84e98a781
commit ea9a89d820
10 changed files with 438 additions and 1 deletions

View File

@@ -14,8 +14,9 @@
<module>errorhandling</module>
<module>file-processing</module>
<module>multipart-http</module>
<module>travel</module>
<module>stored-procedures-derby</module>
<module>tcp-client-server-multiplex</module>
<module>travel</module>
</modules>
</project>

View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<beansProjectDescription>
<version>1</version>
<pluginVersion><![CDATA[2.5.0.201008251000-M3]]></pluginVersion>
<configSuffixes>
<configSuffix><![CDATA[xml]]></configSuffix>
</configSuffixes>
<enableImports><![CDATA[false]]></enableImports>
<configs>
<config>src/main/resources/META-INF/spring/integration/tcpClientServerDemo-context.xml</config>
</configs>
<configSets>
</configSets>
</beansProjectDescription>

View File

@@ -0,0 +1,20 @@
tcp-client-server-multiplex
===========================
If this is your first experience with the spring-integrtion-ip module, start with the **tcp-client-server** project in the basic folder.
That project uses outbound and inbound tcp gateways for communication. As discussed in the Spring Integration
Reference Manual, this has some limitations for performance. If a shared socket (single-use="false") is used,
only one message can be processed at a time (on the client side); we must wait for the response to the
current request before we can send the next request. Otherwise, because only the payload is sent over
tcp, the framework cannot correlate responses to requests.
An alternative is to use a new socket for each message, but this comes with a performance overhead.
The solution is to use 'Collaborating Channel Adapters' (see SI Reference Manual). In such a scenario,
we can send multiple requests before a response is received. This is termed multiplexing.
This sample demonstrates how to configure collaborating channel adapters, on both the client and
server sides, and one technique for correlating the responses to the corresponding request.

View File

@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.integration.samples</groupId>
<artifactId>tcp-client-server-multiplex</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<name>Samples (Intermediate) - TCP Client Server Multiplexing Sample</name>
<packaging>jar</packaging>
<properties>
<spring.version>3.1.0.RELEASE</spring.version>
<spring.integration.version>2.1.0.RC1</spring.integration.version>
<log4j.version>1.2.16</log4j.version>
<junit.version>4.10</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<version>${spring.integration.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- test-scoped dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.5</source>
<target>1.5</target>
<compilerArgument>-Xlint:all</compilerArgument>
<showWarnings>true</showWarnings>
<showDeprecation>false</showDeprecation>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>repository.springframework.maven.release</id>
<name>Spring Framework Maven Release Repository</name>
<url>http://maven.springframework.org/release</url>
</repository>
<repository>
<id>repository.springframework.maven.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>http://maven.springframework.org/milestone</url>
</repository>
<repository>
<id>repository.springframework.maven.snapshot</id>
<name>Spring Framework Maven Snapshot Repository</name>
<url>http://maven.springframework.org/snapshot</url>
</repository>
</repositories>
</project>

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
import java.io.UnsupportedEncodingException;
import org.springframework.core.convert.converter.Converter;
/**
* Simple byte array to String converter; allowing the character set
* to be specified.
*
* @author Gary Russell
* @since 2.1
*
*/
public class ByteArrayToStringConverter implements Converter<byte[], String> {
private String charSet = "UTF-8";
public String convert(byte[] bytes) {
try {
return new String(bytes, this.charSet);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return new String(bytes);
}
}
/**
* @return the charSet
*/
public String getCharSet() {
return charSet;
}
/**
* @param charSet the charSet to set
*/
public void setCharSet(String charSet) {
this.charSet = charSet;
}
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
/**
* Simple service that receives data in a byte array,
* converts it to a String and appends it with ':echo'.
*
* @author Gary Russell
* @since 2.1
*
*/
public class EchoService {
public String test(String input) {
if ("FAIL".equals(input)) {
throw new RuntimeException("Failure Demonstration");
}
return input + ":echo";
}
}

View File

@@ -0,0 +1,27 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
/**
* @author Gary Russell
* @since 2.1
*
*/
public interface SimpleGateway {
public String send(String text);
}

View File

@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/integration"
xmlns:ip="http://www.springframework.org/schema/integration/ip"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd">
<beans:description>
Uses conversion service and collaborating channel adapters.
</beans:description>
<converter>
<beans:bean class="org.springframework.integration.samples.tcpclientserver.ByteArrayToStringConverter" />
</converter>
<!-- Given we are looking for performance, let's use
the most performant wire protocol. -->
<beans:bean id="fastestWireFormatSerializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer">
<beans:constructor-arg value="1" />
</beans:bean>
<!-- Client side -->
<gateway id="gw"
service-interface="org.springframework.integration.samples.tcpclientserver.SimpleGateway"
default-request-channel="input" />
<ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="11111"
single-use="false"
serializer="fastestWireFormatSerializer"
deserializer="fastestWireFormatSerializer"
so-timeout="10000" />
<publish-subscribe-channel id="input" />
<ip:tcp-outbound-channel-adapter id="outAdapter.client"
order="2"
channel="input"
connection-factory="client" /> <!-- Collaborator -->
<!-- Also send a copy to the custom aggregator for correlation and
so this message's replyChannel will be transferred to the
aggregated message.
The order ensures this gets to the aggregator first -->
<bridge input-channel="input" output-channel="toAggregator.client"
order="1"/>
<!-- Asynch receive reply -->
<ip:tcp-inbound-channel-adapter id="inAdapter.client"
channel="toAggregator.client"
connection-factory="client" /> <!-- Collaborator -->
<!-- dataType attribute invokes the conversion service, if necessary -->
<channel id="toAggregator.client" datatype="java.lang.String" />
<aggregator input-channel="toAggregator.client"
output-channel="toTransformer.client"
correlation-strategy-expression="payload.substring(0,3)"
release-strategy-expression="size() == 2" />
<transformer input-channel="toTransformer.client"
expression="payload.get(1)"/> <!-- The response is always second -->
<!-- Server side -->
<ip:tcp-connection-factory id="server"
type="server"
port="11111"
using-nio="true"
serializer="fastestWireFormatSerializer"
deserializer="fastestWireFormatSerializer" />
<ip:tcp-inbound-channel-adapter id="inAdapter.server"
channel="toSA"
connection-factory="server" />
<!-- dataType attribute invokes the conversion service -->
<channel id="toSA" datatype="java.lang.String" />
<service-activator input-channel="toSA"
output-channel="toObAdapter"
ref="echoService"
method="test" />
<beans:bean id="echoService"
class="org.springframework.integration.samples.tcpclientserver.EchoService" />
<channel id="toObAdapter" />
<ip:tcp-outbound-channel-adapter id="outAdapter.server"
channel="toObAdapter"
connection-factory="server" />
</beans:beans>

View File

@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<!-- Appenders -->
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%-5p: %c - %m%n" />
</layout>
</appender>
<!-- Loggers -->
<logger name="org.springframework">
<level value="info" />
</logger>
<logger name="org.springframework.integration.samples">
<level value="debug" />
</logger>
<!-- Root Logger -->
<root>
<priority value="warn" />
<appender-ref ref="console" />
</root>
</log4j:configuration>

View File

@@ -0,0 +1,78 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Demonstrates the use of a gateway as an entry point into the integration flow,
* with two pairs of collaborating channel adapters (client and server), which
* enables multiplexing multiple messages over the same connection.
*
* Requires correlation data in the payload.
*
* @author Gary Russell
* @since 2.1
*
*/
@ContextConfiguration("/META-INF/spring/integration/tcpClientServerDemo-conversion-context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class TcpClientServerDemoTest {
@Autowired
SimpleGateway gw;
@Test
public void testHappyDay() {
String result = gw.send("999Hello world!"); // first 3 bytes is correlationid
assertEquals("999Hello world!:echo", result);
}
@Test
public void testMultiPlex() throws Exception {
TaskExecutor executor = new SimpleAsyncTaskExecutor();
final CountDownLatch latch = new CountDownLatch(100);
final Set<Integer> results = new HashSet<Integer>();
for (int i = 100; i < 200; i++) {
results.add(i);
final int j = i;
executor.execute(new Runnable() {
public void run() {
String result = gw.send(j + "Hello world!"); // first 3 bytes is correlationid
assertEquals(j + "Hello world!:echo", result);
results.remove(j);
latch.countDown();
}});
}
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(0, results.size());
}
}