diff --git a/.gitignore b/.gitignore index 278e4bd1..e06881e9 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ log.roo *.ipr *.iws derby.log +.idea +activemq-data diff --git a/basic/tcp-client-server/README.md b/basic/tcp-client-server/README.md index c4f05803..4ec514a9 100644 --- a/basic/tcp-client-server/README.md +++ b/basic/tcp-client-server/README.md @@ -47,3 +47,8 @@ This can also be demonstrated with the telnet client thus... telnet> quit Connection closed. +A third option exists for converting a stream of bytes to a domain object or message payload. You can hook up different serializers/deserializers at the connection factory which will apply the conversions right when the stream comes in to the gateway and right when it goes out. + +See **TcpServerConnectionDeserializeTest** for using a simple (comes with spring) Stx/Etx serializer. + +See **TcpServerCustomSerializerTest** for creating and using your own serializers \ No newline at end of file diff --git a/basic/tcp-client-server/pom.xml b/basic/tcp-client-server/pom.xml index b43cb304..aa558496 100644 --- a/basic/tcp-client-server/pom.xml +++ b/basic/tcp-client-server/pom.xml @@ -47,7 +47,13 @@ ${spring.version} test - + + commons-lang + commons-lang + 2.6 + + + diff --git a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/ByteArrayToStringConverter.java b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/ByteArrayToStringConverter.java index 096b1d7c..80acafba 100644 --- a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/ByteArrayToStringConverter.java +++ b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/ByteArrayToStringConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2011 the original author or authors. + * Copyright 2002-2012 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/CustomOrder.java b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/CustomOrder.java new file mode 100644 index 00000000..9ab1513f --- /dev/null +++ b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/CustomOrder.java @@ -0,0 +1,54 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + +/** + * @author: ceposta + */ +public class CustomOrder { + private int number; + private String sender; + private String message; + + public CustomOrder(int number, String sender) { + this.number = number; + this.sender = sender; + } + + public int getNumber() { + return number; + } + + public String getSender() { + return sender; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} diff --git a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/CustomSerializerDeserializer.java b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/CustomSerializerDeserializer.java new file mode 100644 index 00000000..9beaa8b4 --- /dev/null +++ b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/CustomSerializerDeserializer.java @@ -0,0 +1,149 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.core.serializer.Deserializer; +import org.springframework.core.serializer.Serializer; + +/** + * This class is used to demonstrate how you can create a custom serializer/deserializer + * to convert a TCP stream into custom objects which your domain-specific code can use. + * + * Since this is custom, it will have to have its own predefined assumptions for dealing + * with the stream. In other words, there will have to be some indication within the + * contents of the stream where the beginning/end is and how to extract the contents + * into something meaningful (like an object). An example would be a fixed-file formatted + * stream with the length encoded in some well known part of the stream (for example, the + * first 8 bytes of the stream?). + * + * This custom serializer/deserializer assumes the first 3 bytes of the stream will be + * considered an Order Number, the next 10 bytes will be the Sender's Name, the next 6 bytes + * represents an left-zero-padded integer that specifies how long the rest of the message + * content is. After that message content is parsed from the stream, the stream is assumed + * to not have anything after it. In your code you could have delimiters to mark the end + * of the stream, or could agree with the client that a valid stream is only n characters, + * etc. Eitherway, since its custom, the client and server must have some predefined + * assumptions in place for the communication to take place. + * + * + * @author: ceposta + */ +public class CustomSerializerDeserializer implements Serializer, Deserializer{ + + protected final Log logger = LogFactory.getLog(this.getClass()); + + private static final int ORDER_NUMBER_LENGTH = 3; + private static final int SENDER_NAME_LENGTH = 10; + private static final int MESSAGE_LENGTH_LENGTH = 6; + + + /** + * Convert a CustomOrder object into a byte-stream + * + * @param object + * @param outputStream + * @throws IOException + */ + public void serialize(CustomOrder object, OutputStream outputStream) throws IOException { + byte[] number = Integer.toString(object.getNumber()).getBytes(); + outputStream.write(number); + + byte[] senderName = object.getSender().getBytes(); + outputStream.write(senderName); + + String lenghtPadded = pad(6, object.getMessage().length()); + byte[] length = lenghtPadded.getBytes(); + outputStream.write(length); + + outputStream.write(object.getMessage().getBytes()); + outputStream.flush(); + } + + private String pad(int desiredLength, int length) { + return StringUtils.leftPad(Integer.toString(length), desiredLength, '0'); + } + + /** + * Convert a raw byte stream into a CustomOrder + * + * @param inputStream + * @return + * @throws IOException + */ + public CustomOrder deserialize(InputStream inputStream) throws IOException { + int orderNumber = parseOrderNumber(inputStream); + String senderName = parseSenderName(inputStream); + + CustomOrder order = new CustomOrder(orderNumber, senderName); + String message = parseMessage(inputStream); + order.setMessage(message); + return order; + } + + private String parseMessage(InputStream inputStream) throws IOException { + String lengthString = parseString(inputStream, MESSAGE_LENGTH_LENGTH); + int lengthOfMessage = Integer.valueOf(lengthString); + + String message = parseString(inputStream, lengthOfMessage); + return message; + } + + private String parseString(InputStream inputStream, int length) throws IOException { + StringBuilder builder = new StringBuilder(); + + int c; + for (int i = 0; i < length; ++i) { + c = inputStream.read(); + checkClosure(c); + builder.append((char)c); + } + + return builder.toString(); + } + + private String parseSenderName(InputStream inputStream) throws IOException { + return parseString(inputStream, SENDER_NAME_LENGTH); + } + + private int parseOrderNumber(InputStream inputStream) throws IOException { + String value = parseString(inputStream, ORDER_NUMBER_LENGTH); + return Integer.valueOf(value.toString()); + } + + + /** + * Check whether the byte passed in is the "closed socket" byte + * Note, I put this in here just as an example, but you could just extend the + * {@link org.springframework.integration.ip.tcp.serializer.AbstractByteArraySerializer} class + * which has this method + * + * @param bite + * @throws IOException + */ + protected void checkClosure(int bite) throws IOException { + if (bite < 0) { + logger.debug("Socket closed during message assembly"); + throw new IOException("Socket closed during message assembly"); + } + } +} diff --git a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/EchoService.java b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/EchoService.java index e0015dc9..d63e1810 100644 --- a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/EchoService.java +++ b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/EchoService.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2012 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.springframework.integration.samples.tcpclientserver; /** * Simple service that receives data in a byte array, * converts it to a String and appends it to 'echo:'. - * + * * @author Gary Russell * */ diff --git a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/SimpleGateway.java b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/SimpleGateway.java index df7d1b51..c6c7dcc4 100644 --- a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/SimpleGateway.java +++ b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/SimpleGateway.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2012 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,5 +22,5 @@ package org.springframework.integration.samples.tcpclientserver; public interface SimpleGateway { public String send(String text); - + } \ No newline at end of file diff --git a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/TelnetServer.java b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/TelnetServer.java index 99d97e5e..4d471ad0 100644 --- a/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/TelnetServer.java +++ b/basic/tcp-client-server/src/main/java/org/springframework/integration/samples/tcpclientserver/TelnetServer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2012 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext; * it works fine as a very simple Telnet server - connect using * telnet localhost 11111 - each time you hit enter you should see your input * echoed back, preceded by 'echo:'. - * + * * @author Gary Russell * */ diff --git a/basic/tcp-client-server/src/main/resources/META-INF/spring/integration/tcpServerConnectionDeserialize-context.xml b/basic/tcp-client-server/src/main/resources/META-INF/spring/integration/tcpServerConnectionDeserialize-context.xml new file mode 100644 index 00000000..174bf73e --- /dev/null +++ b/basic/tcp-client-server/src/main/resources/META-INF/spring/integration/tcpServerConnectionDeserialize-context.xml @@ -0,0 +1,66 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/basic/tcp-client-server/src/main/resources/META-INF/spring/integration/tcpServerCustomSerialize-context.xml b/basic/tcp-client-server/src/main/resources/META-INF/spring/integration/tcpServerCustomSerialize-context.xml new file mode 100644 index 00000000..32887a62 --- /dev/null +++ b/basic/tcp-client-server/src/main/resources/META-INF/spring/integration/tcpServerCustomSerialize-context.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/basic/tcp-client-server/src/main/resources/log4j.xml b/basic/tcp-client-server/src/main/resources/log4j.xml index f391e2ed..4c5f6f83 100644 --- a/basic/tcp-client-server/src/main/resources/log4j.xml +++ b/basic/tcp-client-server/src/main/resources/log4j.xml @@ -15,6 +15,10 @@ + + + + @@ -24,5 +28,5 @@ - + \ No newline at end of file diff --git a/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpClientServerDemoTest.java b/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpClientServerDemoTest.java index 50c830b4..bdff84ea 100644 --- a/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpClientServerDemoTest.java +++ b/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpClientServerDemoTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2012 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -27,7 +28,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * Demonstrates the use of a gateway as an entry point into the integration flow. * The message generated by the gateway is sent over tcp by the outbound gateway - * to the inbound gateway. In turn the inbound gateway sends the message to an + * to the inbound gateway. In turn the inbound gateway sends the message to an * echo service and the echoed response comes back over tcp and is returned to * the test case for verification. * @@ -43,9 +44,10 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; // This one uses the conversion service //@ContextConfiguration("/META-INF/spring/integration/tcpClientServerDemo-conversion-context.xml") @RunWith(SpringJUnit4ClassRunner.class) +@DirtiesContext public class TcpClientServerDemoTest { - @Autowired + @Autowired SimpleGateway gw; @Test diff --git a/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpServerConnectionDeserializeTest.java b/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpServerConnectionDeserializeTest.java new file mode 100644 index 00000000..335197c9 --- /dev/null +++ b/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpServerConnectionDeserializeTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.Message; +import org.springframework.integration.MessageChannel; +import org.springframework.integration.core.SubscribableChannel; +import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.io.StringWriter; + +import static org.junit.Assert.assertEquals; + +/** + * Shows an example of using the Stx/Etx stream framing serializers that are included with + * Spring Integration. We can be confident that the streams are properly handled because we + * explicitly send a stream with the Stx/Etx frame and the beginning and end of the actual + * content and the Server is configured to be able to handle the frame. In the asserts, we + * assert that the payload, once it reaches a component (in this case, the message listener + * we create and attach to the incomingServerChannel), does not have any of the Stx/Etx bytes. + * + * @author: ceposta + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(locations = {"/META-INF/spring/integration/tcpServerConnectionDeserialize-context.xml"}) +@DirtiesContext +public class TcpServerConnectionDeserializeTest { + + @Autowired + SimpleGateway gw; + + @Autowired + @Qualifier("incomingServerChannel") + MessageChannel incomingServerChannel; + + @Test + public void testHappyPath() { + + // add a listener to this channel, otherwise there is not one defined + // the reason we use a listener here is so we can assert truths on the + // message and/or payload + SubscribableChannel channel = (SubscribableChannel) incomingServerChannel; + channel.subscribe(new AbstractReplyProducingMessageHandler(){ + + @Override + protected Object handleRequestMessage(Message requestMessage) { + byte[] payload = (byte[]) requestMessage.getPayload(); + + // we assert during the processing of the messaging that the + // payload is just the content we wanted to send without the + // framing bytes (STX/ETX) + assertEquals("Hello World!", new String(payload)); + return requestMessage; + } + }); + + String sourceMessage = wrapWithStxEtx("Hello World!"); + String result = gw.send(sourceMessage); + System.out.println(result); + assertEquals("Hello World!", result); + } + + /** + * Show, explicitly, how the stream would look if you had to manually create it. + * + * See more about TCP synchronous communication for more about framing the stream + * with STX/ETX: http://en.wikipedia.org/wiki/Binary_Synchronous_Communications + * + * @param content + * @return a string that is wrapped with the STX/ETX framing bytes + */ + private String wrapWithStxEtx(String content) { + StringWriter writer = new StringWriter(); + writer.write(ByteArrayStxEtxSerializer.STX); + writer.write(content); + writer.write(ByteArrayStxEtxSerializer.ETX); + return writer.toString(); + } +} diff --git a/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpServerCustomSerializerTest.java b/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpServerCustomSerializerTest.java new file mode 100644 index 00000000..14a2396f --- /dev/null +++ b/basic/tcp-client-server/src/test/java/org/springframework/integration/samples/tcpclientserver/TcpServerCustomSerializerTest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.tcpclientserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.Socket; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.Message; +import org.springframework.integration.MessageChannel; +import org.springframework.integration.core.SubscribableChannel; +import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * Some use cases may dictate you needing to create your own stream handling serializers + * and deserializers. This sample shows a custom serializer/deserializer being used with + * the Java socket API on the front end (client) and the Spring Integration TCP inbound + * gateway with the custom serializer/deserializers. + * + * @author: ceposta + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(locations = {"/META-INF/spring/integration/tcpServerCustomSerialize-context.xml"}) +@DirtiesContext +public class TcpServerCustomSerializerTest { + + @Autowired + @Qualifier("incomingServerChannel") + MessageChannel incomingServerChannel; + + @Test + public void testHappyPath() { + + // add a listener to this channel, otherwise there is not one defined + // the reason we use a listener here is so we can assert truths on the + // message and/or payload + SubscribableChannel channel = (SubscribableChannel) incomingServerChannel; + channel.subscribe(new AbstractReplyProducingMessageHandler(){ + + @Override + protected Object handleRequestMessage(Message requestMessage) { + CustomOrder payload = (CustomOrder) requestMessage.getPayload(); + + // we assert during the processing of the messaging that the + // payload is just the content we wanted to send without the + // framing bytes (STX/ETX) + assertEquals(123, payload.getNumber()); + assertEquals("PINGPONG02", payload.getSender()); + assertEquals("You got it to work!", payload.getMessage()); + return requestMessage; + } + }); + + String sourceMessage = "123PINGPONG02000019You got it to work!"; + + + // use the java socket API to make the connection to the server + Socket socket = null; + Writer out = null; + BufferedReader in = null; + try { + socket = new Socket("localhost", 11111); + out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); + out.write(sourceMessage); + out.flush(); + + in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + StringBuffer str = new StringBuffer(); + + int c; + while ((c = in.read()) != -1) { + str.append((char) c); + } + + String response = str.toString(); + assertEquals(sourceMessage, response); + + } catch (IOException e) { + fail("Test ended with an exception " + e.getMessage()); + } + finally { + try { + socket.close(); + out.close(); + in.close(); + + } catch (Exception e) { + // swallow exception + } + + } + } +}