diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/tcp/TcpConnectionEvent.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/tcp/TcpConnectionEvent.java new file mode 100644 index 0000000..b0eef8e --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/tcp/TcpConnectionEvent.java @@ -0,0 +1,82 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.tcp; + +import org.springframework.context.ApplicationEvent; +import org.springframework.integration.ip.tcp.connection.TcpConnection; + +/** + * ApplicationEvent representing certain operations on a {@link TcpConnection}. + * + * TODO: Borrowed from SI 3.0 - remove when M1 available + * + * @author Gary Russell + * @since 3.0 + * + */ +public class TcpConnectionEvent extends ApplicationEvent { + + private static final long serialVersionUID = 5323436446362192129L; + + public static final String OPEN = "OPEN"; + + public static final String CLOSE = "CLOSE"; + + public static final String EXCEPTION = "EXCEPTION"; + + private final String type; + + private final String connectionFactoryName; + + private final Throwable throwable; + + public TcpConnectionEvent(TcpConnection connection, String type, + String connectionFactoryName) { + super(connection); + this.type = type; + this.throwable = null; + this.connectionFactoryName = connectionFactoryName; + } + + public TcpConnectionEvent(TcpConnection connection, Throwable t, + String connectionFactoryName) { + super(connection); + this.type = EXCEPTION; + this.throwable = t; + this.connectionFactoryName = connectionFactoryName; + } + + public String getType() { + return type; + } + + public String getConnectionId() { + return ((TcpConnection) this.getSource()).getConnectionId(); + } + + public String getConnectionFactoryName() { + return connectionFactoryName; + } + + @Override + public String toString() { + return "TcpConnectionEvent [type=" + this.getType() + + ", factory=" + this.connectionFactoryName + + ", connectionId=" + this.getConnectionId() + + (this.throwable == null ? "" : ", Exception=" + this.throwable) + "]"; + } + +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketEvent.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketEvent.java new file mode 100644 index 0000000..4a3a891 --- /dev/null +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketEvent.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.x.ip.websocket; + +import org.springframework.integration.ip.tcp.connection.TcpConnection; +import org.springframework.integration.x.ip.tcp.TcpConnectionEvent; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class WebSocketEvent extends TcpConnectionEvent { + + private static final long serialVersionUID = -6788341703196233248L; + + public static final String HANDSHAKE_COMPLETE = "HANDSHAKE_COMPLETE"; + + public static final String WEBSOCKET_CLOSED = "WEBSOCKET_CLOSED"; + + private final String path; + + private final String queryString; + + public WebSocketEvent(TcpConnection connection, String type, String path, String queryString) { + super(connection, type, "unknown"); + this.path = path; + this.queryString = queryString; + } + + protected String getPath() { + return path; + } + + protected String getQueryString() { + return queryString; + } + + @Override + public String toString() { + return super.toString().replace("TcpConnectionEvent", "WebSocketEvent") + .replace("]", ", path=" + this.path + ", queryString=" + this.queryString + "]"); + } + +} diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java index 7378f59..90dc770 100644 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketTcpConnectionInterceptorFactory.java @@ -22,6 +22,8 @@ import java.net.Socket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.core.serializer.Deserializer; import org.springframework.integration.Message; import org.springframework.integration.MessageHandlingException; @@ -47,15 +49,22 @@ import org.springframework.util.Assert; * @since 3.0 * */ -public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory { +public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory, + ApplicationEventPublisherAware { private static final Log logger = LogFactory.getLog(WebSocketTcpConnectionInterceptor.class); + private volatile ApplicationEventPublisher applicationEventPublisher; + @Override public TcpConnectionInterceptor getInterceptor() { return new WebSocketTcpConnectionInterceptor(); } + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + private class WebSocketTcpConnectionInterceptor extends AbstractTcpConnectionInterceptor { private volatile boolean shook; @@ -136,6 +145,9 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn } this.send(message); } + WebSocketEvent event = new WebSocketEvent(this.getTheConnection(), + WebSocketEvent.WEBSOCKET_CLOSED, state.getPath(), state.getQueryString()); + publish(event); this.close(); } catch (Exception e) { @@ -199,6 +211,9 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn try { doHandshake(payload, message.getHeaders()); this.shook = true; + WebSocketEvent event = new WebSocketEvent(this.getTheConnection(), + WebSocketEvent.HANDSHAKE_COMPLETE, state.getPath(), state.getQueryString()); + publish(event); } catch (Exception e) { throw new MessageHandlingException(message, "Handshake failed", e); @@ -207,6 +222,17 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn return true; } + private void publish(WebSocketEvent event) { + if (WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher != null) { + WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher.publishEvent(event); + } + else { + if (logger.isWarnEnabled()) { + logger.warn("No publisher available for " + event); + } + } + } + private void protocolViolation(Message message) { if (logger.isDebugEnabled()) { logger.debug("Protocol violation - closing; " + message); diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java index 9d1f553..9aa165a 100644 --- a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/WebSocketServerTests.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.context.ApplicationListener; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.Message; import org.springframework.integration.annotation.Header; @@ -44,7 +45,7 @@ public class WebSocketServerTests { System.exit(0); } - public static class DemoService { + public static class DemoService implements ApplicationListener { private static final Log logger = LogFactory.getLog(DemoService.class); @@ -92,6 +93,17 @@ public class WebSocketServerTests { logger.warn("Error on write; removing " + connetionId); clients.remove(connetionId); } + + @Override + public void onApplicationEvent(WebSocketEvent event) { + logger.info(event); + if (WebSocketEvent.HANDSHAKE_COMPLETE.equals(event.getType())) { + startStop("start", event.getConnectionId()); + } + else if (WebSocketEvent.WEBSOCKET_CLOSED.equals(event.getType())) { + clients.remove(event.getConnectionId()); + } + } } } diff --git a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html index 80904e2..7145fc0 100644 --- a/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html +++ b/spring-integration-ip-extensions/src/test/java/org/springframework/integration/x/ip/websocket/ws.html @@ -68,7 +68,7 @@


- +