INTEXT-45 SI 3.0.0.M1 - now 1.0.0.BUILD-SNAPSHOT

- Use core TCP eventing for WebSocketEvents; change Type literals to enumerations
- Use a WebSocketMessageMapper instead of rebuilding the message in the interceptor
- Use core TCP Abstract Serializer
- Use 3.0 API instead of reflection to get key for Deserializer state
This commit is contained in:
Gary Russell
2013-02-15 13:59:39 -05:00
parent f2d3a87946
commit 870f76b58f
15 changed files with 147 additions and 360 deletions

View File

@@ -0,0 +1,2 @@
reports
fuzz*

View File

@@ -33,7 +33,7 @@ ext {
log4jVersion = '1.2.12' log4jVersion = '1.2.12'
mockitoVersion = '1.9.0' mockitoVersion = '1.9.0'
springVersion = '3.1.3.RELEASE' springVersion = '3.1.3.RELEASE'
springIntegrationVersion = '2.2.0.RELEASE' springIntegrationVersion = '3.0.0.M1'
} }
eclipse { eclipse {

View File

@@ -1 +1 @@
version=0.1.0.BUILD-SNAPSHOT version=1.0.0.BUILD-SNAPSHOT

View File

@@ -1,85 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.x.ip.serializer;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
/**
* Base class for (de)serializers that provide a mechanism to
* reconstruct a byte array from an arbitrary stream.
*
* TODO: Enhanced version of standard class - will be merged in 3.0.
*
* @author Gary Russell
* @since 2.0
*
*/
public abstract class AbstractByteArraySerializer implements
Serializer<byte[]>,
Deserializer<byte[]> {
protected int maxMessageSize = 2048;
protected final Log logger = LogFactory.getLog(this.getClass());
/**
* The maximum supported message size for this serializer.
* Default 2048.
* @return The max message size.
*/
public int getMaxMessageSize() {
return maxMessageSize;
}
/**
* The maximum supported message size for this serializer.
* Default 2048.
* @param maxMessageSize The max message size.
*/
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
protected void checkClosure(int bite) throws IOException {
if (bite < 0) {
logger.debug("Socket closed during message assembly");
throw new IOException("Socket closed during message assembly");
}
}
/**
* Copy size bytes to a new buffer exactly size bytes long.
* @param buffer The buffer containing the data.
* @param size The number of bytes to copy.
* @return The new buffer, or the buffer parameter if it is
* already the correct size.
*/
protected byte[] copyToSizedArray(byte[] buffer, int size) {
if (size == buffer.length) {
return buffer;
}
byte[] assembledData = new byte[size];
System.arraycopy(buffer, 0, assembledData, 0, size);
return assembledData;
}
}

View File

@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
/** /**
* Base class for (de)Serializers that start with an HTTP-like protocol then * Base class for (de)Serializers that start with an HTTP-like protocol then
@@ -55,6 +56,7 @@ public abstract class AbstractHttpSwitchingDeserializer implements StatefulDeser
return crlfDeserializer; return crlfDeserializer;
} }
@Override
public abstract DataFrame deserialize(InputStream inputStream) throws IOException; public abstract DataFrame deserialize(InputStream inputStream) throws IOException;
protected BasicState getStreamState(InputStream inputStream) { protected BasicState getStreamState(InputStream inputStream) {
@@ -130,6 +132,7 @@ public abstract class AbstractHttpSwitchingDeserializer implements StatefulDeser
return new DataFrame(type, frameData); return new DataFrame(type, frameData);
} }
@Override
public void removeState(Object key) { public void removeState(Object key) {
this.streamState.remove(key); this.streamState.remove(key);
} }

View File

@@ -1,87 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.x.ip.serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
/**
* Reads data in an InputStream to a byte[]; data must be terminated by \r\n
* (not included in resulting byte[]).
* Writes a byte[] to an OutputStream and adds \r\n.
*
* TODO: Enhanced version of standard class - will be merged in 3.0.
*
* @author Gary Russell
* @since 2.0
*/
public class ByteArrayCrLfSerializer extends AbstractByteArraySerializer {
private static final byte[] CRLF = "\r\n".getBytes();
/**
* Reads the data in the inputstream to a byte[]. Data must be terminated
* by CRLF (\r\n). Throws a {@link SoftEndOfStreamException} if the stream
* is closed immediately after the \r\n (i.e. no data is in the process of
* being read).
*/
public byte[] deserialize(InputStream inputStream) throws IOException {
byte[] buffer = new byte[this.maxMessageSize];
int n = this.fillToCrLf(inputStream, buffer);
byte[] assembledData = this.copyToSizedArray(buffer, n);
return assembledData;
}
public int fillToCrLf(InputStream inputStream, byte[] buffer)
throws IOException, SoftEndOfStreamException {
int n = 0;
int bite;
if (logger.isDebugEnabled()) {
logger.debug("Available to read:" + inputStream.available());
}
while (true) {
bite = inputStream.read();
// logger.debug("Read:" + (char) bite);
if (bite < 0 && n == 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
checkClosure(bite);
if (n > 0 && bite == '\n' && buffer[n-1] == '\r') {
break;
}
buffer[n++] = (byte) bite;
if (n >= this.maxMessageSize) {
throw new IOException("CRLF not found before max message length: "
+ this.maxMessageSize);
}
};
return n-1; // trim \r
}
/**
* Writes the byte[] to the stream and appends \r\n.
*/
public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
outputStream.write(bytes);
outputStream.write(CRLF);
outputStream.flush();
}
}

View File

@@ -1,82 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.x.ip.tcp;
import org.springframework.context.ApplicationEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
/**
* ApplicationEvent representing certain operations on a {@link TcpConnection}.
*
* TODO: Borrowed from SI 3.0 - remove when M1 available
*
* @author Gary Russell
* @since 3.0
*
*/
public class TcpConnectionEvent extends ApplicationEvent {
private static final long serialVersionUID = 5323436446362192129L;
public static final String OPEN = "OPEN";
public static final String CLOSE = "CLOSE";
public static final String EXCEPTION = "EXCEPTION";
private final String type;
private final String connectionFactoryName;
private final Throwable throwable;
public TcpConnectionEvent(TcpConnection connection, String type,
String connectionFactoryName) {
super(connection);
this.type = type;
this.throwable = null;
this.connectionFactoryName = connectionFactoryName;
}
public TcpConnectionEvent(TcpConnection connection, Throwable t,
String connectionFactoryName) {
super(connection);
this.type = EXCEPTION;
this.throwable = t;
this.connectionFactoryName = connectionFactoryName;
}
public String getType() {
return type;
}
public String getConnectionId() {
return ((TcpConnection) this.getSource()).getConnectionId();
}
public String getConnectionFactoryName() {
return connectionFactoryName;
}
@Override
public String toString() {
return "TcpConnectionEvent [type=" + this.getType() +
", factory=" + this.connectionFactoryName +
", connectionId=" + this.getConnectionId() +
(this.throwable == null ? "" : ", Exception=" + this.throwable) + "]";
}
}

View File

@@ -15,8 +15,8 @@
*/ */
package org.springframework.integration.x.ip.websocket; package org.springframework.integration.x.ip.websocket;
import org.springframework.integration.ip.tcp.connection.TcpConnection; import org.springframework.integration.ip.tcp.connection.TcpConnectionEvent;
import org.springframework.integration.x.ip.tcp.TcpConnectionEvent; import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
/** /**
* @author Gary Russell * @author Gary Russell
@@ -27,15 +27,16 @@ public class WebSocketEvent extends TcpConnectionEvent {
private static final long serialVersionUID = -6788341703196233248L; private static final long serialVersionUID = -6788341703196233248L;
public static final String HANDSHAKE_COMPLETE = "HANDSHAKE_COMPLETE"; public enum WebSocketEventType implements EventType {
HANDSHAKE_COMPLETE,
public static final String WEBSOCKET_CLOSED = "WEBSOCKET_CLOSED"; WEBSOCKET_CLOSED
}
private final String path; private final String path;
private final String queryString; private final String queryString;
public WebSocketEvent(TcpConnection connection, String type, String path, String queryString) { public WebSocketEvent(TcpConnectionSupport connection, WebSocketEventType type, String path, String queryString) {
super(connection, type, "unknown"); super(connection, type, "unknown");
this.path = path; this.path = path;
this.queryString = queryString; this.queryString = queryString;

View File

@@ -0,0 +1,44 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.x.ip.websocket;
import java.util.Map;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpMessageMapper;
import org.springframework.integration.x.ip.websocket.WebSocketTcpConnectionInterceptorFactory.WebSocketTcpConnectionInterceptor;
/**
* @author Gary Russell
* @since 3.0
*
*/
public class WebSocketMessageMapper extends TcpMessageMapper {
private final WebSocketTcpConnectionInterceptorFactory connectionInterceptorFactory;
public WebSocketMessageMapper(WebSocketTcpConnectionInterceptorFactory connectionInterceptorFactory) {
this.connectionInterceptorFactory = connectionInterceptorFactory;
}
@Override
protected Map<String, ?> supplyCustomHeaders(TcpConnection connection) {
WebSocketTcpConnectionInterceptor interceptor = this.connectionInterceptorFactory.locateInterceptor(connection);
return interceptor == null ? null : interceptor.getAdditionalHeaders();
}
}

View File

@@ -74,6 +74,7 @@ public class WebSocketSerializer extends AbstractHttpSwitchingDeserializer imple
return new WebSocketState(); return new WebSocketState();
} }
@Override
public void serialize(final Object frame, OutputStream outputStream) public void serialize(final Object frame, OutputStream outputStream)
throws IOException { throws IOException {
String data = ""; String data = "";
@@ -199,7 +200,7 @@ public class WebSocketSerializer extends AbstractHttpSwitchingDeserializer imple
int maskInx = 0; int maskInx = 0;
int rsv = 0; int rsv = 0;
while (!done ) { while (!done ) {
bite = inputStream.read(); bite = inputStream.read() & 0xff;
// logger.debug("Read:" + Integer.toHexString(bite)); // logger.debug("Read:" + Integer.toHexString(bite));
if (bite < 0 && n == 0) { if (bite < 0 && n == 0) {
throw new SoftEndOfStreamException("Stream closed between payloads"); throw new SoftEndOfStreamException("Stream closed between payloads");

View File

@@ -15,15 +15,12 @@
*/ */
package org.springframework.integration.x.ip.websocket; package org.springframework.integration.x.ip.websocket;
import java.io.IOException; import java.util.HashMap;
import java.io.InputStream; import java.util.Map;
import java.net.Socket; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.serializer.Deserializer; import org.springframework.core.serializer.Deserializer;
import org.springframework.integration.Message; import org.springframework.integration.Message;
import org.springframework.integration.MessageHandlingException; import org.springframework.integration.MessageHandlingException;
@@ -34,13 +31,13 @@ import org.springframework.integration.aggregator.ResequencingMessageHandler;
import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageHandler; import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.ip.tcp.connection.AbstractTcpConnectionInterceptor;
import org.springframework.integration.ip.tcp.connection.TcpConnection; import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptor;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetConnection; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpNioConnection; import org.springframework.integration.ip.tcp.connection.TcpNioConnection;
import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.x.ip.websocket.WebSocketEvent.WebSocketEventType;
import org.springframework.integration.x.ip.websocket.WebSocketSerializer.WebSocketState; import org.springframework.integration.x.ip.websocket.WebSocketSerializer.WebSocketState;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@@ -49,28 +46,26 @@ import org.springframework.util.Assert;
* @since 3.0 * @since 3.0
* *
*/ */
public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory, public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory {
ApplicationEventPublisherAware {
private static final Log logger = LogFactory.getLog(WebSocketTcpConnectionInterceptor.class); private static final Log logger = LogFactory.getLog(WebSocketTcpConnectionInterceptor.class);
private volatile ApplicationEventPublisher applicationEventPublisher; private final Map<TcpConnection, WebSocketTcpConnectionInterceptor> connections =
new ConcurrentHashMap<TcpConnection, WebSocketTcpConnectionInterceptor>();
@Override @Override
public TcpConnectionInterceptor getInterceptor() { public TcpConnectionInterceptorSupport getInterceptor() {
return new WebSocketTcpConnectionInterceptor(); return new WebSocketTcpConnectionInterceptor();
} }
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { public WebSocketTcpConnectionInterceptor locateInterceptor(TcpConnection connection) {
this.applicationEventPublisher = applicationEventPublisher; return this.connections.get(connection);
} }
private class WebSocketTcpConnectionInterceptor extends AbstractTcpConnectionInterceptor { public class WebSocketTcpConnectionInterceptor extends TcpConnectionInterceptorSupport {
private volatile boolean shook; private volatile boolean shook;
private volatile InputStream theInputStream;
private final DirectChannel resequenceChannel = new DirectChannel(); private final DirectChannel resequenceChannel = new DirectChannel();
private final EventDrivenConsumer resequencer; private final EventDrivenConsumer resequencer;
@@ -114,16 +109,7 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
public boolean doOnMessage(Message<?> message) { public boolean doOnMessage(Message<?> message) {
Assert.isInstanceOf(WebSocketFrame.class, message.getPayload()); Assert.isInstanceOf(WebSocketFrame.class, message.getPayload());
WebSocketFrame payload = (WebSocketFrame) message.getPayload(); WebSocketFrame payload = (WebSocketFrame) message.getPayload();
InputStream inputStream = null; WebSocketState state = getState(message);
try {
inputStream = this.getTheInputStream();
}
catch (IOException e1) {
this.protocolViolation(message);
}
WebSocketState state = (WebSocketState) this.getRequiredDeserializer().getState(inputStream);
Assert.notNull(state, "State must not be null:" + message);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(state); logger.trace(state);
} }
@@ -146,8 +132,8 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
this.send(message); this.send(message);
} }
WebSocketEvent event = new WebSocketEvent(this.getTheConnection(), WebSocketEvent event = new WebSocketEvent(this.getTheConnection(),
WebSocketEvent.WEBSOCKET_CLOSED, state.getPath(), state.getQueryString()); WebSocketEventType.WEBSOCKET_CLOSED, state.getPath(), state.getQueryString());
publish(event); this.getTheConnection().publishEvent(event);
this.close(); this.close();
} }
catch (Exception e) { catch (Exception e) {
@@ -196,24 +182,15 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
} }
} }
else if (this.shook) { else if (this.shook) {
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(message); return super.onMessage(message);
// TODO: Move to subclass of TcpMessageMapper when INT-2877 is merged
if (state.getPath() != null) {
messageBuilder.setHeader(WebSocketHeaders.PATH, state.getPath());
}
if (state.getQueryString() != null) {
messageBuilder.setHeader(WebSocketHeaders.QUERY_STRING, state.getQueryString());
}
return super.onMessage(
messageBuilder.build());
} }
else { else {
try { try {
doHandshake(payload, message.getHeaders()); doHandshake(payload, message.getHeaders());
this.shook = true; this.shook = true;
WebSocketEvent event = new WebSocketEvent(this.getTheConnection(), WebSocketEvent event = new WebSocketEvent(this.getTheConnection(),
WebSocketEvent.HANDSHAKE_COMPLETE, state.getPath(), state.getQueryString()); WebSocketEventType.HANDSHAKE_COMPLETE, state.getPath(), state.getQueryString());
publish(event); this.getTheConnection().publishEvent(event);
} }
catch (Exception e) { catch (Exception e) {
throw new MessageHandlingException(message, "Handshake failed", e); throw new MessageHandlingException(message, "Handshake failed", e);
@@ -222,15 +199,13 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
return true; return true;
} }
private void publish(WebSocketEvent event) { private WebSocketState getState(Object object) {
if (WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher != null) { Object stateKey = null;
WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher.publishEvent(event); stateKey = this.getTheConnection().getDeserializerStateKey();
} Assert.notNull(stateKey, "StateKey must not be null:" + object);
else { WebSocketState state = (WebSocketState) this.getRequiredDeserializer().getState(stateKey);
if (logger.isWarnEnabled()) { Assert.notNull(state, "State must not be null:" + object);
logger.warn("No publisher available for " + event); return state;
}
}
} }
private void protocolViolation(Message<?> message) { private void protocolViolation(Message<?> message) {
@@ -242,52 +217,32 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
WebSocketFrame close = new WebSocketFrame(WebSocketFrame.TYPE_CLOSE, error); WebSocketFrame close = new WebSocketFrame(WebSocketFrame.TYPE_CLOSE, error);
close.setStatus(frame.getType() == WebSocketFrame.TYPE_INVALID_UTF8 ? (short) 1007 : (short) 1002); close.setStatus(frame.getType() == WebSocketFrame.TYPE_INVALID_UTF8 ? (short) 1007 : (short) 1002);
try { try {
((WebSocketState) this.getRequiredDeserializer().getState(this.getTheInputStream())).setCloseInitiated(true); Object stateKey = this.getTheConnection().getDeserializerStateKey();
this.send(MessageBuilder.withPayload(close) if (stateKey != null) {
.copyHeaders(message.getHeaders()) WebSocketState webSocketState = (WebSocketState) this.getRequiredDeserializer().getState(stateKey);
.build()); if (webSocketState != null) {
webSocketState.setCloseInitiated(true);
}
this.send(MessageBuilder.withPayload(close)
.copyHeaders(message.getHeaders())
.build());
}
} }
catch (Exception e) { catch (Exception e) {
throw new MessageHandlingException(message, "Send failed", e); } throw new MessageHandlingException(message, "Send failed", e);
}
} }
@Override @Override
public void close() { public void close() {
try { connections.remove(this.getTheConnection());
InputStream inputStream = getTheInputStream(); Object stateKey = this.getTheConnection().getDeserializerStateKey();
if (inputStream != null) { if (stateKey != null) {
this.getRequiredDeserializer().removeState(inputStream); this.getRequiredDeserializer().removeState(stateKey);
}
}
catch (IOException e) {
} }
super.close(); super.close();
} }
/**
* Hack - need to add getInputStream() to TcpConnection.
* @return
* @throws IOException
*/
private InputStream getTheInputStream() throws IOException {
if (this.theInputStream != null) {
return this.theInputStream;
}
TcpConnection theConnection = this.getTheConnection();
InputStream inputStream = null;
if (theConnection instanceof TcpNioConnection) {
inputStream = (InputStream) new DirectFieldAccessor(theConnection).getPropertyValue("pipedInputStream");
}
else if (theConnection instanceof TcpNetConnection) {
Socket socket = (Socket) new DirectFieldAccessor(theConnection).getPropertyValue("socket");
if (socket != null) {
inputStream = socket.getInputStream();
}
}
this.theInputStream = inputStream;
return inputStream;
}
private void doHandshake(WebSocketFrame frame, MessageHeaders messageHeaders) throws Exception { private void doHandshake(WebSocketFrame frame, MessageHeaders messageHeaders) throws Exception {
try { try {
WebSocketFrame handshake = this.getRequiredDeserializer().generateHandshake(frame); WebSocketFrame handshake = this.getRequiredDeserializer().generateHandshake(frame);
@@ -312,6 +267,24 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
"Deserializer must be a WebSocketSerializer"); "Deserializer must be a WebSocketSerializer");
return (WebSocketSerializer) deserializer; return (WebSocketSerializer) deserializer;
} }
public Map<String, String> getAdditionalHeaders() {
Map<String, String> headers = new HashMap<String, String>();
WebSocketState state = this.getState(this.getConnectionId());
if (state.getPath() != null) {
headers.put(WebSocketHeaders.PATH, state.getPath());
}
if (state.getQueryString() != null) {
headers.put(WebSocketHeaders.QUERY_STRING, state.getQueryString());
}
return headers;
}
@Override
public void setTheConnection(TcpConnectionSupport theConnection) {
connections.put(theConnection, this);
super.setTheConnection(theConnection);
}
} }
} }

View File

@@ -34,7 +34,7 @@
serializer="wsSerializer" serializer="wsSerializer"
deserializer="wsSerializer" /> deserializer="wsSerializer" />
<bean id="sslCtxSup" class="org.springframework.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport"> <bean id="sslCtxSup" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSSLContextSupport">
<constructor-arg value="key.store"/> <constructor-arg value="key.store"/>
<constructor-arg value="trust.store"/> <constructor-arg value="trust.store"/>
<constructor-arg value="secret"/> <constructor-arg value="secret"/>

View File

@@ -15,18 +15,24 @@
so-timeout="600000" so-timeout="600000"
interceptor-factory-chain="interceptors" interceptor-factory-chain="interceptors"
serializer="wsSerializer" serializer="wsSerializer"
deserializer="wsSerializer" /> deserializer="wsSerializer"
mapper="mapper" />
<bean id="mapper" class="org.springframework.integration.x.ip.websocket.WebSocketMessageMapper">
<constructor-arg ref="webSocketInterceptorFactory"/>
</bean>
<bean id="wsSerializer" class="org.springframework.integration.x.ip.websocket.WebSocketSerializer"> <bean id="wsSerializer" class="org.springframework.integration.x.ip.websocket.WebSocketSerializer">
<property name="server" value="true" /> <property name="server" value="true" />
</bean> </bean>
<bean id="interceptors" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain"> <bean id="interceptors" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors"> <property name="interceptors" ref="webSocketInterceptorFactory" />
<bean class="org.springframework.integration.x.ip.websocket.WebSocketTcpConnectionInterceptorFactory" />
</property>
</bean> </bean>
<bean id="webSocketInterceptorFactory"
class="org.springframework.integration.x.ip.websocket.WebSocketTcpConnectionInterceptorFactory" />
<int-ip:tcp-inbound-channel-adapter connection-factory="ws" channel="startStopChannel" /> <int-ip:tcp-inbound-channel-adapter connection-factory="ws" channel="startStopChannel" />
<bean id="service" class="org.springframework.integration.x.ip.websocket.WebSocketServerTests$DemoService" /> <bean id="service" class="org.springframework.integration.x.ip.websocket.WebSocketServerTests$DemoService" />

View File

@@ -28,8 +28,10 @@ import org.springframework.context.ApplicationListener;
import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message; import org.springframework.integration.Message;
import org.springframework.integration.annotation.Header; import org.springframework.integration.annotation.Header;
import org.springframework.integration.annotation.Headers;
import org.springframework.integration.ip.IpHeaders; import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.x.ip.websocket.WebSocketEvent.WebSocketEventType;
/** /**
* @author Gary Russell * @author Gary Russell
@@ -53,7 +55,16 @@ public class WebSocketServerTests {
private final Map<String, AtomicInteger> paused = new HashMap<String, AtomicInteger>(); private final Map<String, AtomicInteger> paused = new HashMap<String, AtomicInteger>();
public void startStop(String command, @Header(IpHeaders.CONNECTION_ID) String connectionId) { public void startStop(String command, @Header(IpHeaders.CONNECTION_ID) String connectionId,
@Headers Map<String, ?> headers) {
if (headers != null) {
logger.info("Received '"
+ command
+ "' from '"
+ connectionId
+ "' path:"
+ headers.get(WebSocketHeaders.PATH) + " query-string:" + headers.get(WebSocketHeaders.QUERY_STRING));
}
if ("stop".equalsIgnoreCase(command)) { if ("stop".equalsIgnoreCase(command)) {
AtomicInteger clientInt = clients.remove(connectionId); AtomicInteger clientInt = clients.remove(connectionId);
if (clientInt != null) { if (clientInt != null) {
@@ -97,10 +108,10 @@ public class WebSocketServerTests {
@Override @Override
public void onApplicationEvent(WebSocketEvent event) { public void onApplicationEvent(WebSocketEvent event) {
logger.info(event); logger.info(event);
if (WebSocketEvent.HANDSHAKE_COMPLETE.equals(event.getType())) { if (WebSocketEventType.HANDSHAKE_COMPLETE.equals(event.getType())) {
startStop("start", event.getConnectionId()); startStop("start", event.getConnectionId(), null);
} }
else if (WebSocketEvent.WEBSOCKET_CLOSED.equals(event.getType())) { else if (WebSocketEventType.WEBSOCKET_CLOSED.equals(event.getType())) {
clients.remove(event.getConnectionId()); clients.remove(event.getConnectionId());
} }
} }

View File

@@ -25,7 +25,7 @@
<script type="text/javascript"> <script type="text/javascript">
var webSocket; var webSocket;
if (window.WebSocket) { if (window.WebSocket) {
webSocket = new WebSocket("ws://localhost:18080/myapp"); webSocket = new WebSocket("ws://localhost:18080/myapp?foo");
webSocket.onmessage = function(event) { webSocket.onmessage = function(event) {
document.getElementById("theField").value = event.data; document.getElementById("theField").value = event.data;
} }