INTSAMPLES-22: Create sample for inbound tcp-connection-factory using deserializer and serializer

* Added new tests
* Added comments
* Created the custom serializer/deserializer. Fixing some small issues with it
* Changed the sending to use java socket api
* Working, just trying to figure out junit differences
* Found an error in the input message (contained \r\n) fixed now
* added .idea and activemq-data folders to ignore
* Cleaned up unused code, added documentation to the README.md
* Added more comments
* Added Copyright notices
This commit is contained in:
Christian Posta
2012-02-03 23:48:20 -07:00
committed by Gunnar Hillert
parent 9eaea90170
commit 375cc41b1a
15 changed files with 554 additions and 12 deletions

2
.gitignore vendored
View File

@@ -11,3 +11,5 @@ log.roo
*.ipr
*.iws
derby.log
.idea
activemq-data

View File

@@ -47,3 +47,8 @@ This can also be demonstrated with the telnet client thus...
telnet> quit
Connection closed.
A third option exists for converting a stream of bytes to a domain object or message payload. You can hook up different serializers/deserializers at the connection factory which will apply the conversions right when the stream comes in to the gateway and right when it goes out.
See **TcpServerConnectionDeserializeTest** for using a simple (comes with spring) Stx/Etx serializer.
See **TcpServerCustomSerializerTest** for creating and using your own serializers

View File

@@ -47,7 +47,13 @@
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@@ -0,0 +1,54 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
/**
* @author: ceposta
*/
public class CustomOrder {
private int number;
private String sender;
private String message;
public CustomOrder(int number, String sender) {
this.number = number;
this.sender = sender;
}
public int getNumber() {
return number;
}
public String getSender() {
return sender;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
}

View File

@@ -0,0 +1,149 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
/**
* This class is used to demonstrate how you can create a custom serializer/deserializer
* to convert a TCP stream into custom objects which your domain-specific code can use.
*
* Since this is custom, it will have to have its own predefined assumptions for dealing
* with the stream. In other words, there will have to be some indication within the
* contents of the stream where the beginning/end is and how to extract the contents
* into something meaningful (like an object). An example would be a fixed-file formatted
* stream with the length encoded in some well known part of the stream (for example, the
* first 8 bytes of the stream?).
*
* This custom serializer/deserializer assumes the first 3 bytes of the stream will be
* considered an Order Number, the next 10 bytes will be the Sender's Name, the next 6 bytes
* represents an left-zero-padded integer that specifies how long the rest of the message
* content is. After that message content is parsed from the stream, the stream is assumed
* to not have anything after it. In your code you could have delimiters to mark the end
* of the stream, or could agree with the client that a valid stream is only n characters,
* etc. Eitherway, since its custom, the client and server must have some predefined
* assumptions in place for the communication to take place.
*
*
* @author: ceposta
*/
public class CustomSerializerDeserializer implements Serializer<CustomOrder>, Deserializer<CustomOrder>{
protected final Log logger = LogFactory.getLog(this.getClass());
private static final int ORDER_NUMBER_LENGTH = 3;
private static final int SENDER_NAME_LENGTH = 10;
private static final int MESSAGE_LENGTH_LENGTH = 6;
/**
* Convert a CustomOrder object into a byte-stream
*
* @param object
* @param outputStream
* @throws IOException
*/
public void serialize(CustomOrder object, OutputStream outputStream) throws IOException {
byte[] number = Integer.toString(object.getNumber()).getBytes();
outputStream.write(number);
byte[] senderName = object.getSender().getBytes();
outputStream.write(senderName);
String lenghtPadded = pad(6, object.getMessage().length());
byte[] length = lenghtPadded.getBytes();
outputStream.write(length);
outputStream.write(object.getMessage().getBytes());
outputStream.flush();
}
private String pad(int desiredLength, int length) {
return StringUtils.leftPad(Integer.toString(length), desiredLength, '0');
}
/**
* Convert a raw byte stream into a CustomOrder
*
* @param inputStream
* @return
* @throws IOException
*/
public CustomOrder deserialize(InputStream inputStream) throws IOException {
int orderNumber = parseOrderNumber(inputStream);
String senderName = parseSenderName(inputStream);
CustomOrder order = new CustomOrder(orderNumber, senderName);
String message = parseMessage(inputStream);
order.setMessage(message);
return order;
}
private String parseMessage(InputStream inputStream) throws IOException {
String lengthString = parseString(inputStream, MESSAGE_LENGTH_LENGTH);
int lengthOfMessage = Integer.valueOf(lengthString);
String message = parseString(inputStream, lengthOfMessage);
return message;
}
private String parseString(InputStream inputStream, int length) throws IOException {
StringBuilder builder = new StringBuilder();
int c;
for (int i = 0; i < length; ++i) {
c = inputStream.read();
checkClosure(c);
builder.append((char)c);
}
return builder.toString();
}
private String parseSenderName(InputStream inputStream) throws IOException {
return parseString(inputStream, SENDER_NAME_LENGTH);
}
private int parseOrderNumber(InputStream inputStream) throws IOException {
String value = parseString(inputStream, ORDER_NUMBER_LENGTH);
return Integer.valueOf(value.toString());
}
/**
* Check whether the byte passed in is the "closed socket" byte
* Note, I put this in here just as an example, but you could just extend the
* {@link org.springframework.integration.ip.tcp.serializer.AbstractByteArraySerializer} class
* which has this method
*
* @param bite
* @throws IOException
*/
protected void checkClosure(int bite) throws IOException {
if (bite < 0) {
logger.debug("Socket closed during message assembly");
throw new IOException("Socket closed during message assembly");
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@ package org.springframework.integration.samples.tcpclientserver;
/**
* Simple service that receives data in a byte array,
* converts it to a String and appends it to 'echo:'.
*
*
* @author Gary Russell
*
*/

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,5 +22,5 @@ package org.springframework.integration.samples.tcpclientserver;
public interface SimpleGateway {
public String send(String text);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
* it works fine as a very simple Telnet server - connect using
* telnet localhost 11111 - each time you hit enter you should see your input
* echoed back, preceded by 'echo:'.
*
*
* @author Gary Russell
*
*/

View File

@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-ip="http://www.springframework.org/schema/integration/ip"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">
<!-- Client side -->
<int:gateway id="gw"
service-interface="org.springframework.integration.samples.tcpclientserver.SimpleGateway"
default-request-channel="input"/>
<!-- Create a connection for the client gateway that uses the same Stx-Etx deserializer to turn the stream
into the appropriate content (it looks for the Stx byte, extracts anything between it and the Etx byte). We
don't specify the serializer (although we could) because the unit test explicitly shows how the content
to be sent is wrapped by the Stx and Etx bytes. -->
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="22222"
single-use="true"
so-timeout="10000"
deserializer="connectionSerializeDeserialize"
/>
<int:channel id="input" />
<int-ip:tcp-outbound-gateway id="outGateway"
request-channel="input"
reply-channel="clientBytes2StringChannel"
connection-factory="client"
request-timeout="10000"
reply-timeout="10000"
/>
<int:channel id="clientBytes2StringChannel"/>
<int:transformer id="clientBytes2String"
input-channel="clientBytes2StringChannel"
expression="new String(payload)"/>
<!-- Server side -->
<!-- When creating the socket factory on the server side, we specify both the serializer and deserializer
which deals with both accepting a stream formatted with the Stx-Etx bytes as well as sending a stream
formatted with the Stx-Etx bytes. -->
<int-ip:tcp-connection-factory id="serverConnectionFactory"
type="server"
port="22222"
serializer="connectionSerializeDeserialize"
deserializer="connectionSerializeDeserialize"/>
<bean id="connectionSerializeDeserialize" class="org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer"/>
<int-ip:tcp-inbound-gateway id="gatewayCrLf"
connection-factory="serverConnectionFactory"
request-channel="incomingServerChannel"
error-channel="errorChannel"/>
<!-- We leave a message listener off of this channel on purpose because we hook
one up before the test actually runs (see the unit test associated with this
context file) -->
<int:channel id="incomingServerChannel" />
</beans>

View File

@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-ip="http://www.springframework.org/schema/integration/ip"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip-2.0.xsd">
<!-- Server side -->
<!-- When creating the socket factory on the server side, we specify both the serializer and deserializer
which deals with both accepting a stream formatted with the Stx-Etx bytes as well as sending a stream
formatted with the Stx-Etx bytes. -->
<int-ip:tcp-connection-factory id="serverConnectionFactory"
type="server"
port="11111"
single-use="true"
so-linger="10000"
serializer="connectionSerializeDeserialize"
deserializer="connectionSerializeDeserialize"/>
<bean id="connectionSerializeDeserialize" class="org.springframework.integration.samples.tcpclientserver.CustomSerializerDeserializer"/>
<int-ip:tcp-inbound-gateway id="gatewayCrLf"
connection-factory="serverConnectionFactory"
request-channel="incomingServerChannel"
error-channel="errorChannel"/>
<!-- We leave a message listener off of this channel on purpose because we hook
one up before the test actually runs (see the unit test associated with this
context file) -->
<int:channel id="incomingServerChannel" />
</beans>

View File

@@ -15,6 +15,10 @@
<level value="warn" />
</logger>
<logger name="org.springframework.integration">
<level value="warn" />
</logger>
<logger name="org.springframework.integration.samples">
<level value="debug" />
</logger>
@@ -24,5 +28,5 @@
<priority value="warn" />
<appender-ref ref="console" />
</root>
</log4j:configuration>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -27,7 +28,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Demonstrates the use of a gateway as an entry point into the integration flow.
* The message generated by the gateway is sent over tcp by the outbound gateway
* to the inbound gateway. In turn the inbound gateway sends the message to an
* to the inbound gateway. In turn the inbound gateway sends the message to an
* echo service and the echoed response comes back over tcp and is returned to
* the test case for verification.
*
@@ -43,9 +44,10 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
// This one uses the conversion service
//@ContextConfiguration("/META-INF/spring/integration/tcpClientServerDemo-conversion-context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class TcpClientServerDemoTest {
@Autowired
@Autowired
SimpleGateway gw;
@Test

View File

@@ -0,0 +1,100 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.StringWriter;
import static org.junit.Assert.assertEquals;
/**
* Shows an example of using the Stx/Etx stream framing serializers that are included with
* Spring Integration. We can be confident that the streams are properly handled because we
* explicitly send a stream with the Stx/Etx frame and the beginning and end of the actual
* content and the Server is configured to be able to handle the frame. In the asserts, we
* assert that the payload, once it reaches a component (in this case, the message listener
* we create and attach to the incomingServerChannel), does not have any of the Stx/Etx bytes.
*
* @author: ceposta
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"/META-INF/spring/integration/tcpServerConnectionDeserialize-context.xml"})
@DirtiesContext
public class TcpServerConnectionDeserializeTest {
@Autowired
SimpleGateway gw;
@Autowired
@Qualifier("incomingServerChannel")
MessageChannel incomingServerChannel;
@Test
public void testHappyPath() {
// add a listener to this channel, otherwise there is not one defined
// the reason we use a listener here is so we can assert truths on the
// message and/or payload
SubscribableChannel channel = (SubscribableChannel) incomingServerChannel;
channel.subscribe(new AbstractReplyProducingMessageHandler(){
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
byte[] payload = (byte[]) requestMessage.getPayload();
// we assert during the processing of the messaging that the
// payload is just the content we wanted to send without the
// framing bytes (STX/ETX)
assertEquals("Hello World!", new String(payload));
return requestMessage;
}
});
String sourceMessage = wrapWithStxEtx("Hello World!");
String result = gw.send(sourceMessage);
System.out.println(result);
assertEquals("Hello World!", result);
}
/**
* Show, explicitly, how the stream would look if you had to manually create it.
*
* See more about TCP synchronous communication for more about framing the stream
* with STX/ETX: http://en.wikipedia.org/wiki/Binary_Synchronous_Communications
*
* @param content
* @return a string that is wrapped with the STX/ETX framing bytes
*/
private String wrapWithStxEtx(String content) {
StringWriter writer = new StringWriter();
writer.write(ByteArrayStxEtxSerializer.STX);
writer.write(content);
writer.write(ByteArrayStxEtxSerializer.ETX);
return writer.toString();
}
}

View File

@@ -0,0 +1,120 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.tcpclientserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.Socket;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Some use cases may dictate you needing to create your own stream handling serializers
* and deserializers. This sample shows a custom serializer/deserializer being used with
* the Java socket API on the front end (client) and the Spring Integration TCP inbound
* gateway with the custom serializer/deserializers.
*
* @author: ceposta
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"/META-INF/spring/integration/tcpServerCustomSerialize-context.xml"})
@DirtiesContext
public class TcpServerCustomSerializerTest {
@Autowired
@Qualifier("incomingServerChannel")
MessageChannel incomingServerChannel;
@Test
public void testHappyPath() {
// add a listener to this channel, otherwise there is not one defined
// the reason we use a listener here is so we can assert truths on the
// message and/or payload
SubscribableChannel channel = (SubscribableChannel) incomingServerChannel;
channel.subscribe(new AbstractReplyProducingMessageHandler(){
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
CustomOrder payload = (CustomOrder) requestMessage.getPayload();
// we assert during the processing of the messaging that the
// payload is just the content we wanted to send without the
// framing bytes (STX/ETX)
assertEquals(123, payload.getNumber());
assertEquals("PINGPONG02", payload.getSender());
assertEquals("You got it to work!", payload.getMessage());
return requestMessage;
}
});
String sourceMessage = "123PINGPONG02000019You got it to work!";
// use the java socket API to make the connection to the server
Socket socket = null;
Writer out = null;
BufferedReader in = null;
try {
socket = new Socket("localhost", 11111);
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
out.write(sourceMessage);
out.flush();
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
StringBuffer str = new StringBuffer();
int c;
while ((c = in.read()) != -1) {
str.append((char) c);
}
String response = str.toString();
assertEquals(sourceMessage, response);
} catch (IOException e) {
fail("Test ended with an exception " + e.getMessage());
}
finally {
try {
socket.close();
out.close();
in.close();
} catch (Exception e) {
// swallow exception
}
}
}
}