diff --git a/intermediate/pom.xml b/intermediate/pom.xml index 35fc2373..a86b430a 100644 --- a/intermediate/pom.xml +++ b/intermediate/pom.xml @@ -14,8 +14,9 @@ errorhandling file-processing multipart-http - travel stored-procedures-derby + tcp-client-server-multiplex + travel diff --git a/intermediate/tcp-client-server-multiplex/.springBeans b/intermediate/tcp-client-server-multiplex/.springBeans new file mode 100644 index 00000000..c107e968 --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/.springBeans @@ -0,0 +1,14 @@ + + + 1 + + + + + + + src/main/resources/META-INF/spring/integration/tcpClientServerDemo-context.xml + + + + diff --git a/intermediate/tcp-client-server-multiplex/README.md b/intermediate/tcp-client-server-multiplex/README.md new file mode 100644 index 00000000..4b1d1b42 --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/README.md @@ -0,0 +1,20 @@ +tcp-client-server-multiplex +=========================== + + +If this is your first experience with the spring-integrtion-ip module, start with the **tcp-client-server** project in the basic folder. + + +That project uses outbound and inbound tcp gateways for communication. As discussed in the Spring Integration +Reference Manual, this has some limitations for performance. If a shared socket (single-use="false") is used, +only one message can be processed at a time (on the client side); we must wait for the response to the +current request before we can send the next request. Otherwise, because only the payload is sent over +tcp, the framework cannot correlate responses to requests. + +An alternative is to use a new socket for each message, but this comes with a performance overhead. + +The solution is to use 'Collaborating Channel Adapters' (see SI Reference Manual). In such a scenario, +we can send multiple requests before a response is received. This is termed multiplexing. + +This sample demonstrates how to configure collaborating channel adapters, on both the client and +server sides, and one technique for correlating the responses to the corresponding request. \ No newline at end of file diff --git a/intermediate/tcp-client-server-multiplex/pom.xml b/intermediate/tcp-client-server-multiplex/pom.xml new file mode 100644 index 00000000..774663bd --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/pom.xml @@ -0,0 +1,77 @@ + + + 4.0.0 + org.springframework.integration.samples + tcp-client-server-multiplex + 2.1.0.BUILD-SNAPSHOT + Samples (Intermediate) - TCP Client Server Multiplexing Sample + jar + + 3.1.0.RELEASE + 2.1.0.RC1 + 1.2.16 + 4.10 + + + + org.springframework.integration + spring-integration-ip + ${spring.integration.version} + + + log4j + log4j + ${log4j.version} + + + + junit + junit + ${junit.version} + + + org.springframework + spring-context + ${spring.version} + + + org.springframework + spring-test + ${spring.version} + test + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.5 + 1.5 + -Xlint:all + true + false + + + + + + + repository.springframework.maven.release + Spring Framework Maven Release Repository + http://maven.springframework.org/release + + + repository.springframework.maven.milestone + Spring Framework Maven Milestone Repository + http://maven.springframework.org/milestone + + + repository.springframework.maven.snapshot + Spring Framework Maven Snapshot Repository + http://maven.springframework.org/snapshot + + + diff --git a/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/ByteArrayToStringConverter.java b/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/ByteArrayToStringConverter.java new file mode 100644 index 00000000..096b1d7c --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/ByteArrayToStringConverter.java @@ -0,0 +1,57 @@ +/* + * Copyright 2002-2011 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +import java.io.UnsupportedEncodingException; + +import org.springframework.core.convert.converter.Converter; + +/** + * Simple byte array to String converter; allowing the character set + * to be specified. + * + * @author Gary Russell + * @since 2.1 + * + */ +public class ByteArrayToStringConverter implements Converter { + + private String charSet = "UTF-8"; + + public String convert(byte[] bytes) { + try { + return new String(bytes, this.charSet); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + return new String(bytes); + } + } + + /** + * @return the charSet + */ + public String getCharSet() { + return charSet; + } + + /** + * @param charSet the charSet to set + */ + public void setCharSet(String charSet) { + this.charSet = charSet; + } + +} diff --git a/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/EchoService.java b/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/EchoService.java new file mode 100644 index 00000000..81fe5d48 --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/EchoService.java @@ -0,0 +1,35 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +/** + * Simple service that receives data in a byte array, + * converts it to a String and appends it with ':echo'. + * + * @author Gary Russell + * @since 2.1 + * + */ +public class EchoService { + + public String test(String input) { + if ("FAIL".equals(input)) { + throw new RuntimeException("Failure Demonstration"); + } + return input + ":echo"; + } + +} \ No newline at end of file diff --git a/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/SimpleGateway.java b/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/SimpleGateway.java new file mode 100644 index 00000000..e42aaa22 --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/src/main/java/org/springframework/integration/samples/tcpclientserver/SimpleGateway.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +/** + * @author Gary Russell + * @since 2.1 + * + */ +public interface SimpleGateway { + + public String send(String text); + +} \ No newline at end of file diff --git a/intermediate/tcp-client-server-multiplex/src/main/resources/META-INF/spring/integration/tcpClientServerDemo-conversion-context.xml b/intermediate/tcp-client-server-multiplex/src/main/resources/META-INF/spring/integration/tcpClientServerDemo-conversion-context.xml new file mode 100644 index 00000000..2a0765e8 --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/src/main/resources/META-INF/spring/integration/tcpClientServerDemo-conversion-context.xml @@ -0,0 +1,100 @@ + + + + + Uses conversion service and collaborating channel adapters. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/intermediate/tcp-client-server-multiplex/src/main/resources/log4j.xml b/intermediate/tcp-client-server-multiplex/src/main/resources/log4j.xml new file mode 100644 index 00000000..136a0e95 --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/src/main/resources/log4j.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/intermediate/tcp-client-server-multiplex/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpClientServerDemoTest.java b/intermediate/tcp-client-server-multiplex/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpClientServerDemoTest.java new file mode 100644 index 00000000..2747c24d --- /dev/null +++ b/intermediate/tcp-client-server-multiplex/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpClientServerDemoTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2010 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + + +/** + * Demonstrates the use of a gateway as an entry point into the integration flow, + * with two pairs of collaborating channel adapters (client and server), which + * enables multiplexing multiple messages over the same connection. + * + * Requires correlation data in the payload. + * + * @author Gary Russell + * @since 2.1 + * + */ +@ContextConfiguration("/META-INF/spring/integration/tcpClientServerDemo-conversion-context.xml") +@RunWith(SpringJUnit4ClassRunner.class) +public class TcpClientServerDemoTest { + + @Autowired + SimpleGateway gw; + + @Test + public void testHappyDay() { + String result = gw.send("999Hello world!"); // first 3 bytes is correlationid + assertEquals("999Hello world!:echo", result); + } + + @Test + public void testMultiPlex() throws Exception { + TaskExecutor executor = new SimpleAsyncTaskExecutor(); + final CountDownLatch latch = new CountDownLatch(100); + final Set results = new HashSet(); + for (int i = 100; i < 200; i++) { + results.add(i); + final int j = i; + executor.execute(new Runnable() { + public void run() { + String result = gw.send(j + "Hello world!"); // first 3 bytes is correlationid + assertEquals(j + "Hello world!:echo", result); + results.remove(j); + latch.countDown(); + }}); + } + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(0, results.size()); + } +}