From 870f76b58fe74db529de0cffa5dec6dd5b1c60f1 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 15 Feb 2013 13:59:39 -0500 Subject: [PATCH] INTEXT-45 SI 3.0.0.M1 - now 1.0.0.BUILD-SNAPSHOT - Use core TCP eventing for WebSocketEvents; change Type literals to enumerations - Use a WebSocketMessageMapper instead of rebuilding the message in the interceptor - Use core TCP Abstract Serializer - Use 3.0 API instead of reflection to get key for Deserializer state --- spring-integration-ip-extensions/.gitignore | 2 + spring-integration-ip-extensions/build.gradle | 2 +- .../gradle.properties | 2 +- .../AbstractByteArraySerializer.java | 85 ---------- .../AbstractHttpSwitchingDeserializer.java | 3 + .../serializer/ByteArrayCrLfSerializer.java | 87 ----------- .../x/ip/tcp/TcpConnectionEvent.java | 82 ---------- .../x/ip/websocket/WebSocketEvent.java | 13 +- .../ip/websocket/WebSocketMessageMapper.java | 44 ++++++ .../x/ip/websocket/WebSocketSerializer.java | 3 +- ...SocketTcpConnectionInterceptorFactory.java | 147 +++++++----------- .../x/ip/websocket/Autobahn-context.xml | 2 +- .../WebSocketServerTests-context.xml | 14 +- .../x/ip/websocket/WebSocketServerTests.java | 19 ++- .../integration/x/ip/websocket/ws.html | 2 +- 15 files changed, 147 insertions(+), 360 deletions(-) create mode 100644 spring-integration-ip-extensions/.gitignore delete mode 100644 spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractByteArraySerializer.java delete mode 100644 spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/ByteArrayCrLfSerializer.java delete mode 100644 spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/tcp/TcpConnectionEvent.java create mode 100644 spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketMessageMapper.java diff --git a/spring-integration-ip-extensions/.gitignore b/spring-integration-ip-extensions/.gitignore new file mode 100644 index 0000000..d0b4258 --- /dev/null +++ b/spring-integration-ip-extensions/.gitignore @@ -0,0 +1,2 @@ +reports +fuzz* diff --git a/spring-integration-ip-extensions/build.gradle b/spring-integration-ip-extensions/build.gradle index 45864a7..9b87052 100644 --- a/spring-integration-ip-extensions/build.gradle +++ b/spring-integration-ip-extensions/build.gradle @@ -33,7 +33,7 @@ ext { log4jVersion = '1.2.12' mockitoVersion = '1.9.0' springVersion = '3.1.3.RELEASE' - springIntegrationVersion = '2.2.0.RELEASE' + springIntegrationVersion = '3.0.0.M1' } eclipse { diff --git a/spring-integration-ip-extensions/gradle.properties b/spring-integration-ip-extensions/gradle.properties index 99f6735..bebfcbc 100644 --- a/spring-integration-ip-extensions/gradle.properties +++ b/spring-integration-ip-extensions/gradle.properties @@ -1 +1 @@ -version=0.1.0.BUILD-SNAPSHOT +version=1.0.0.BUILD-SNAPSHOT diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractByteArraySerializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractByteArraySerializer.java deleted file mode 100644 index 21a9c84..0000000 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractByteArraySerializer.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.x.ip.serializer; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.core.serializer.Deserializer; -import org.springframework.core.serializer.Serializer; - -/** - * Base class for (de)serializers that provide a mechanism to - * reconstruct a byte array from an arbitrary stream. - * - * TODO: Enhanced version of standard class - will be merged in 3.0. - * - * @author Gary Russell - * @since 2.0 - * - */ -public abstract class AbstractByteArraySerializer implements - Serializer, - Deserializer { - - protected int maxMessageSize = 2048; - - protected final Log logger = LogFactory.getLog(this.getClass()); - - /** - * The maximum supported message size for this serializer. - * Default 2048. - * @return The max message size. - */ - public int getMaxMessageSize() { - return maxMessageSize; - } - - /** - * The maximum supported message size for this serializer. - * Default 2048. - * @param maxMessageSize The max message size. - */ - public void setMaxMessageSize(int maxMessageSize) { - this.maxMessageSize = maxMessageSize; - } - - protected void checkClosure(int bite) throws IOException { - if (bite < 0) { - logger.debug("Socket closed during message assembly"); - throw new IOException("Socket closed during message assembly"); - } - } - - /** - * Copy size bytes to a new buffer exactly size bytes long. - * @param buffer The buffer containing the data. - * @param size The number of bytes to copy. - * @return The new buffer, or the buffer parameter if it is - * already the correct size. - */ - protected byte[] copyToSizedArray(byte[] buffer, int size) { - if (size == buffer.length) { - return buffer; - } - byte[] assembledData = new byte[size]; - System.arraycopy(buffer, 0, assembledData, 0, size); - return assembledData; - } - -} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractHttpSwitchingDeserializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractHttpSwitchingDeserializer.java index 776c2e1..e3347c2 100644 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractHttpSwitchingDeserializer.java +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/AbstractHttpSwitchingDeserializer.java @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; /** * Base class for (de)Serializers that start with an HTTP-like protocol then @@ -55,6 +56,7 @@ public abstract class AbstractHttpSwitchingDeserializer implements StatefulDeser return crlfDeserializer; } + @Override public abstract DataFrame deserialize(InputStream inputStream) throws IOException; protected BasicState getStreamState(InputStream inputStream) { @@ -130,6 +132,7 @@ public abstract class AbstractHttpSwitchingDeserializer implements StatefulDeser return new DataFrame(type, frameData); } + @Override public void removeState(Object key) { this.streamState.remove(key); } diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/ByteArrayCrLfSerializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/ByteArrayCrLfSerializer.java deleted file mode 100644 index 8bb831e..0000000 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/serializer/ByteArrayCrLfSerializer.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.x.ip.serializer; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException; - -/** - * Reads data in an InputStream to a byte[]; data must be terminated by \r\n - * (not included in resulting byte[]). - * Writes a byte[] to an OutputStream and adds \r\n. - * - * TODO: Enhanced version of standard class - will be merged in 3.0. - * - * @author Gary Russell - * @since 2.0 - */ -public class ByteArrayCrLfSerializer extends AbstractByteArraySerializer { - - private static final byte[] CRLF = "\r\n".getBytes(); - - /** - * Reads the data in the inputstream to a byte[]. Data must be terminated - * by CRLF (\r\n). Throws a {@link SoftEndOfStreamException} if the stream - * is closed immediately after the \r\n (i.e. no data is in the process of - * being read). - */ - public byte[] deserialize(InputStream inputStream) throws IOException { - byte[] buffer = new byte[this.maxMessageSize]; - int n = this.fillToCrLf(inputStream, buffer); - byte[] assembledData = this.copyToSizedArray(buffer, n); - return assembledData; - } - - public int fillToCrLf(InputStream inputStream, byte[] buffer) - throws IOException, SoftEndOfStreamException { - int n = 0; - int bite; - if (logger.isDebugEnabled()) { - logger.debug("Available to read:" + inputStream.available()); - } - while (true) { - bite = inputStream.read(); -// logger.debug("Read:" + (char) bite); - if (bite < 0 && n == 0) { - throw new SoftEndOfStreamException("Stream closed between payloads"); - } - checkClosure(bite); - if (n > 0 && bite == '\n' && buffer[n-1] == '\r') { - break; - } - buffer[n++] = (byte) bite; - if (n >= this.maxMessageSize) { - throw new IOException("CRLF not found before max message length: " - + this.maxMessageSize); - } - }; - return n-1; // trim \r - } - - /** - * Writes the byte[] to the stream and appends \r\n. - */ - public void serialize(byte[] bytes, OutputStream outputStream) throws IOException { - outputStream.write(bytes); - outputStream.write(CRLF); - outputStream.flush(); - } - -} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/tcp/TcpConnectionEvent.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/tcp/TcpConnectionEvent.java deleted file mode 100644 index b0eef8e..0000000 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/tcp/TcpConnectionEvent.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.integration.x.ip.tcp; - -import org.springframework.context.ApplicationEvent; -import org.springframework.integration.ip.tcp.connection.TcpConnection; - -/** - * ApplicationEvent representing certain operations on a {@link TcpConnection}. - * - * TODO: Borrowed from SI 3.0 - remove when M1 available - * - * @author Gary Russell - * @since 3.0 - * - */ -public class TcpConnectionEvent extends ApplicationEvent { - - private static final long serialVersionUID = 5323436446362192129L; - - public static final String OPEN = "OPEN"; - - public static final String CLOSE = "CLOSE"; - - public static final String EXCEPTION = "EXCEPTION"; - - private final String type; - - private final String connectionFactoryName; - - private final Throwable throwable; - - public TcpConnectionEvent(TcpConnection connection, String type, - String connectionFactoryName) { - super(connection); - this.type = type; - this.throwable = null; - this.connectionFactoryName = connectionFactoryName; - } - - public TcpConnectionEvent(TcpConnection connection, Throwable t, - String connectionFactoryName) { - super(connection); - this.type = EXCEPTION; - this.throwable = t; - this.connectionFactoryName = connectionFactoryName; - } - - public String getType() { - return type; - } - - public String getConnectionId() { - return ((TcpConnection) this.getSource()).getConnectionId(); - } - - public String getConnectionFactoryName() { - return connectionFactoryName; - } - - @Override - public String toString() { - return "TcpConnectionEvent [type=" + this.getType() + - ", factory=" + this.connectionFactoryName + - ", connectionId=" + this.getConnectionId() + - (this.throwable == null ? "" : ", Exception=" + this.throwable) + "]"; - } - -} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketEvent.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketEvent.java index 8130fbc..c3a8462 100644 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketEvent.java +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketEvent.java @@ -15,8 +15,8 @@ */ package org.springframework.integration.x.ip.websocket; -import org.springframework.integration.ip.tcp.connection.TcpConnection; -import org.springframework.integration.x.ip.tcp.TcpConnectionEvent; +import org.springframework.integration.ip.tcp.connection.TcpConnectionEvent; +import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport; /** * @author Gary Russell @@ -27,15 +27,16 @@ public class WebSocketEvent extends TcpConnectionEvent { private static final long serialVersionUID = -6788341703196233248L; - public static final String HANDSHAKE_COMPLETE = "HANDSHAKE_COMPLETE"; - - public static final String WEBSOCKET_CLOSED = "WEBSOCKET_CLOSED"; + public enum WebSocketEventType implements EventType { + HANDSHAKE_COMPLETE, + WEBSOCKET_CLOSED + } private final String path; private final String queryString; - public WebSocketEvent(TcpConnection connection, String type, String path, String queryString) { + public WebSocketEvent(TcpConnectionSupport connection, WebSocketEventType type, String path, String queryString) { super(connection, type, "unknown"); this.path = path; this.queryString = queryString; diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketMessageMapper.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketMessageMapper.java new file mode 100644 index 0000000..e65d637 --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketMessageMapper.java @@ -0,0 +1,44 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.websocket; + +import java.util.Map; + +import org.springframework.integration.ip.tcp.connection.TcpConnection; +import org.springframework.integration.ip.tcp.connection.TcpMessageMapper; +import org.springframework.integration.x.ip.websocket.WebSocketTcpConnectionInterceptorFactory.WebSocketTcpConnectionInterceptor; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class WebSocketMessageMapper extends TcpMessageMapper { + + private final WebSocketTcpConnectionInterceptorFactory connectionInterceptorFactory; + + public WebSocketMessageMapper(WebSocketTcpConnectionInterceptorFactory connectionInterceptorFactory) { + this.connectionInterceptorFactory = connectionInterceptorFactory; + } + + @Override + protected Map supplyCustomHeaders(TcpConnection connection) { + WebSocketTcpConnectionInterceptor interceptor = this.connectionInterceptorFactory.locateInterceptor(connection); + return interceptor == null ? null : interceptor.getAdditionalHeaders(); + } + + +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java index 365d3d7..809f405 100644 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java @@ -74,6 +74,7 @@ public class WebSocketSerializer extends AbstractHttpSwitchingDeserializer imple return new WebSocketState(); } + @Override public void serialize(final Object frame, OutputStream outputStream) throws IOException { String data = ""; @@ -199,7 +200,7 @@ public class WebSocketSerializer extends AbstractHttpSwitchingDeserializer imple int maskInx = 0; int rsv = 0; while (!done ) { - bite = inputStream.read(); + bite = inputStream.read() & 0xff; // logger.debug("Read:" + Integer.toHexString(bite)); if (bite < 0 && n == 0) { throw new SoftEndOfStreamException("Stream closed between payloads"); diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java index 90dc770..fac6b63 100644 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java @@ -15,15 +15,12 @@ */ package org.springframework.integration.x.ip.websocket; -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.beans.DirectFieldAccessor; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.core.serializer.Deserializer; import org.springframework.integration.Message; import org.springframework.integration.MessageHandlingException; @@ -34,13 +31,13 @@ import org.springframework.integration.aggregator.ResequencingMessageHandler; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageHandler; import org.springframework.integration.endpoint.EventDrivenConsumer; -import org.springframework.integration.ip.tcp.connection.AbstractTcpConnectionInterceptor; import org.springframework.integration.ip.tcp.connection.TcpConnection; -import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptor; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; -import org.springframework.integration.ip.tcp.connection.TcpNetConnection; +import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport; +import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport; import org.springframework.integration.ip.tcp.connection.TcpNioConnection; import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.x.ip.websocket.WebSocketEvent.WebSocketEventType; import org.springframework.integration.x.ip.websocket.WebSocketSerializer.WebSocketState; import org.springframework.util.Assert; @@ -49,28 +46,26 @@ import org.springframework.util.Assert; * @since 3.0 * */ -public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory, - ApplicationEventPublisherAware { +public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory { private static final Log logger = LogFactory.getLog(WebSocketTcpConnectionInterceptor.class); - private volatile ApplicationEventPublisher applicationEventPublisher; + private final Map connections = + new ConcurrentHashMap(); @Override - public TcpConnectionInterceptor getInterceptor() { + public TcpConnectionInterceptorSupport getInterceptor() { return new WebSocketTcpConnectionInterceptor(); } - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; + public WebSocketTcpConnectionInterceptor locateInterceptor(TcpConnection connection) { + return this.connections.get(connection); } - private class WebSocketTcpConnectionInterceptor extends AbstractTcpConnectionInterceptor { + public class WebSocketTcpConnectionInterceptor extends TcpConnectionInterceptorSupport { private volatile boolean shook; - private volatile InputStream theInputStream; - private final DirectChannel resequenceChannel = new DirectChannel(); private final EventDrivenConsumer resequencer; @@ -114,16 +109,7 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn public boolean doOnMessage(Message message) { Assert.isInstanceOf(WebSocketFrame.class, message.getPayload()); WebSocketFrame payload = (WebSocketFrame) message.getPayload(); - InputStream inputStream = null; - try { - inputStream = this.getTheInputStream(); - } - catch (IOException e1) { - this.protocolViolation(message); - } - - WebSocketState state = (WebSocketState) this.getRequiredDeserializer().getState(inputStream); - Assert.notNull(state, "State must not be null:" + message); + WebSocketState state = getState(message); if (logger.isTraceEnabled()) { logger.trace(state); } @@ -146,8 +132,8 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn this.send(message); } WebSocketEvent event = new WebSocketEvent(this.getTheConnection(), - WebSocketEvent.WEBSOCKET_CLOSED, state.getPath(), state.getQueryString()); - publish(event); + WebSocketEventType.WEBSOCKET_CLOSED, state.getPath(), state.getQueryString()); + this.getTheConnection().publishEvent(event); this.close(); } catch (Exception e) { @@ -196,24 +182,15 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn } } else if (this.shook) { - MessageBuilder messageBuilder = MessageBuilder.fromMessage(message); - // TODO: Move to subclass of TcpMessageMapper when INT-2877 is merged - if (state.getPath() != null) { - messageBuilder.setHeader(WebSocketHeaders.PATH, state.getPath()); - } - if (state.getQueryString() != null) { - messageBuilder.setHeader(WebSocketHeaders.QUERY_STRING, state.getQueryString()); - } - return super.onMessage( - messageBuilder.build()); + return super.onMessage(message); } else { try { doHandshake(payload, message.getHeaders()); this.shook = true; WebSocketEvent event = new WebSocketEvent(this.getTheConnection(), - WebSocketEvent.HANDSHAKE_COMPLETE, state.getPath(), state.getQueryString()); - publish(event); + WebSocketEventType.HANDSHAKE_COMPLETE, state.getPath(), state.getQueryString()); + this.getTheConnection().publishEvent(event); } catch (Exception e) { throw new MessageHandlingException(message, "Handshake failed", e); @@ -222,15 +199,13 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn return true; } - private void publish(WebSocketEvent event) { - if (WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher != null) { - WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher.publishEvent(event); - } - else { - if (logger.isWarnEnabled()) { - logger.warn("No publisher available for " + event); - } - } + private WebSocketState getState(Object object) { + Object stateKey = null; + stateKey = this.getTheConnection().getDeserializerStateKey(); + Assert.notNull(stateKey, "StateKey must not be null:" + object); + WebSocketState state = (WebSocketState) this.getRequiredDeserializer().getState(stateKey); + Assert.notNull(state, "State must not be null:" + object); + return state; } private void protocolViolation(Message message) { @@ -242,52 +217,32 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn WebSocketFrame close = new WebSocketFrame(WebSocketFrame.TYPE_CLOSE, error); close.setStatus(frame.getType() == WebSocketFrame.TYPE_INVALID_UTF8 ? (short) 1007 : (short) 1002); try { - ((WebSocketState) this.getRequiredDeserializer().getState(this.getTheInputStream())).setCloseInitiated(true); - this.send(MessageBuilder.withPayload(close) - .copyHeaders(message.getHeaders()) - .build()); + Object stateKey = this.getTheConnection().getDeserializerStateKey(); + if (stateKey != null) { + WebSocketState webSocketState = (WebSocketState) this.getRequiredDeserializer().getState(stateKey); + if (webSocketState != null) { + webSocketState.setCloseInitiated(true); + } + this.send(MessageBuilder.withPayload(close) + .copyHeaders(message.getHeaders()) + .build()); + } } catch (Exception e) { - throw new MessageHandlingException(message, "Send failed", e); } + throw new MessageHandlingException(message, "Send failed", e); + } } @Override public void close() { - try { - InputStream inputStream = getTheInputStream(); - if (inputStream != null) { - this.getRequiredDeserializer().removeState(inputStream); - } - } - catch (IOException e) { + connections.remove(this.getTheConnection()); + Object stateKey = this.getTheConnection().getDeserializerStateKey(); + if (stateKey != null) { + this.getRequiredDeserializer().removeState(stateKey); } super.close(); } - /** - * Hack - need to add getInputStream() to TcpConnection. - * @return - * @throws IOException - */ - private InputStream getTheInputStream() throws IOException { - if (this.theInputStream != null) { - return this.theInputStream; - } - TcpConnection theConnection = this.getTheConnection(); - InputStream inputStream = null; - if (theConnection instanceof TcpNioConnection) { - inputStream = (InputStream) new DirectFieldAccessor(theConnection).getPropertyValue("pipedInputStream"); - } - else if (theConnection instanceof TcpNetConnection) { - Socket socket = (Socket) new DirectFieldAccessor(theConnection).getPropertyValue("socket"); - if (socket != null) { - inputStream = socket.getInputStream(); - } - } - this.theInputStream = inputStream; - return inputStream; - } - private void doHandshake(WebSocketFrame frame, MessageHeaders messageHeaders) throws Exception { try { WebSocketFrame handshake = this.getRequiredDeserializer().generateHandshake(frame); @@ -312,6 +267,24 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn "Deserializer must be a WebSocketSerializer"); return (WebSocketSerializer) deserializer; } + + public Map getAdditionalHeaders() { + Map headers = new HashMap(); + WebSocketState state = this.getState(this.getConnectionId()); + if (state.getPath() != null) { + headers.put(WebSocketHeaders.PATH, state.getPath()); + } + if (state.getQueryString() != null) { + headers.put(WebSocketHeaders.QUERY_STRING, state.getQueryString()); + } + return headers; + } + + @Override + public void setTheConnection(TcpConnectionSupport theConnection) { + connections.put(theConnection, this); + super.setTheConnection(theConnection); + } } } diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/Autobahn-context.xml b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/Autobahn-context.xml index 847549a..d9082b2 100644 --- a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/Autobahn-context.xml +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/Autobahn-context.xml @@ -34,7 +34,7 @@ serializer="wsSerializer" deserializer="wsSerializer" /> - + diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests-context.xml b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests-context.xml index efa6f27..f09d0d8 100644 --- a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests-context.xml +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests-context.xml @@ -15,18 +15,24 @@ so-timeout="600000" interceptor-factory-chain="interceptors" serializer="wsSerializer" - deserializer="wsSerializer" /> + deserializer="wsSerializer" + mapper="mapper" /> + + + + - - - + + + diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java index 9aa165a..fc19591 100644 --- a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java @@ -28,8 +28,10 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.Message; import org.springframework.integration.annotation.Header; +import org.springframework.integration.annotation.Headers; import org.springframework.integration.ip.IpHeaders; import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.x.ip.websocket.WebSocketEvent.WebSocketEventType; /** * @author Gary Russell @@ -53,7 +55,16 @@ public class WebSocketServerTests { private final Map paused = new HashMap(); - public void startStop(String command, @Header(IpHeaders.CONNECTION_ID) String connectionId) { + public void startStop(String command, @Header(IpHeaders.CONNECTION_ID) String connectionId, + @Headers Map headers) { + if (headers != null) { + logger.info("Received '" + + command + + "' from '" + + connectionId + + "' path:" + + headers.get(WebSocketHeaders.PATH) + " query-string:" + headers.get(WebSocketHeaders.QUERY_STRING)); + } if ("stop".equalsIgnoreCase(command)) { AtomicInteger clientInt = clients.remove(connectionId); if (clientInt != null) { @@ -97,10 +108,10 @@ public class WebSocketServerTests { @Override public void onApplicationEvent(WebSocketEvent event) { logger.info(event); - if (WebSocketEvent.HANDSHAKE_COMPLETE.equals(event.getType())) { - startStop("start", event.getConnectionId()); + if (WebSocketEventType.HANDSHAKE_COMPLETE.equals(event.getType())) { + startStop("start", event.getConnectionId(), null); } - else if (WebSocketEvent.WEBSOCKET_CLOSED.equals(event.getType())) { + else if (WebSocketEventType.WEBSOCKET_CLOSED.equals(event.getType())) { clients.remove(event.getConnectionId()); } } diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html index 7145fc0..3066292 100644 --- a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html @@ -25,7 +25,7 @@