Add WebSocket Events
Building on TCP Event publishing in S.I. 3.0, add events for 'HANDSHAKE_COMPLETE' and 'WEBSOCKET_CLOSED'. Update DemoService to consume these events. Change ws.html demo so that the service is started immediately and the 'command' box is now prefilled with 'stop'. When closing the websocket, the DemoService immediately removes the connection from its map, rather than waiting for an exception on the write.
This commit is contained in:
committed by
Mark Fisher
parent
873ff937a1
commit
d66b3c4794
@@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.x.ip.tcp;
|
||||
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.integration.ip.tcp.connection.TcpConnection;
|
||||
|
||||
/**
|
||||
* ApplicationEvent representing certain operations on a {@link TcpConnection}.
|
||||
*
|
||||
* TODO: Borrowed from SI 3.0 - remove when M1 available
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
public class TcpConnectionEvent extends ApplicationEvent {
|
||||
|
||||
private static final long serialVersionUID = 5323436446362192129L;
|
||||
|
||||
public static final String OPEN = "OPEN";
|
||||
|
||||
public static final String CLOSE = "CLOSE";
|
||||
|
||||
public static final String EXCEPTION = "EXCEPTION";
|
||||
|
||||
private final String type;
|
||||
|
||||
private final String connectionFactoryName;
|
||||
|
||||
private final Throwable throwable;
|
||||
|
||||
public TcpConnectionEvent(TcpConnection connection, String type,
|
||||
String connectionFactoryName) {
|
||||
super(connection);
|
||||
this.type = type;
|
||||
this.throwable = null;
|
||||
this.connectionFactoryName = connectionFactoryName;
|
||||
}
|
||||
|
||||
public TcpConnectionEvent(TcpConnection connection, Throwable t,
|
||||
String connectionFactoryName) {
|
||||
super(connection);
|
||||
this.type = EXCEPTION;
|
||||
this.throwable = t;
|
||||
this.connectionFactoryName = connectionFactoryName;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getConnectionId() {
|
||||
return ((TcpConnection) this.getSource()).getConnectionId();
|
||||
}
|
||||
|
||||
public String getConnectionFactoryName() {
|
||||
return connectionFactoryName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TcpConnectionEvent [type=" + this.getType() +
|
||||
", factory=" + this.connectionFactoryName +
|
||||
", connectionId=" + this.getConnectionId() +
|
||||
(this.throwable == null ? "" : ", Exception=" + this.throwable) + "]";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.integration.x.ip.websocket;
|
||||
|
||||
import org.springframework.integration.ip.tcp.connection.TcpConnection;
|
||||
import org.springframework.integration.x.ip.tcp.TcpConnectionEvent;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
public class WebSocketEvent extends TcpConnectionEvent {
|
||||
|
||||
private static final long serialVersionUID = -6788341703196233248L;
|
||||
|
||||
public static final String HANDSHAKE_COMPLETE = "HANDSHAKE_COMPLETE";
|
||||
|
||||
public static final String WEBSOCKET_CLOSED = "WEBSOCKET_CLOSED";
|
||||
|
||||
private final String path;
|
||||
|
||||
private final String queryString;
|
||||
|
||||
public WebSocketEvent(TcpConnection connection, String type, String path, String queryString) {
|
||||
super(connection, type, "unknown");
|
||||
this.path = path;
|
||||
this.queryString = queryString;
|
||||
}
|
||||
|
||||
protected String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
protected String getQueryString() {
|
||||
return queryString;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString().replace("TcpConnectionEvent", "WebSocketEvent")
|
||||
.replace("]", ", path=" + this.path + ", queryString=" + this.queryString + "]");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -22,6 +22,8 @@ import java.net.Socket;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.core.serializer.Deserializer;
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.MessageHandlingException;
|
||||
@@ -47,15 +49,22 @@ import org.springframework.util.Assert;
|
||||
* @since 3.0
|
||||
*
|
||||
*/
|
||||
public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory {
|
||||
public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionInterceptorFactory,
|
||||
ApplicationEventPublisherAware {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(WebSocketTcpConnectionInterceptor.class);
|
||||
|
||||
private volatile ApplicationEventPublisher applicationEventPublisher;
|
||||
|
||||
@Override
|
||||
public TcpConnectionInterceptor getInterceptor() {
|
||||
return new WebSocketTcpConnectionInterceptor();
|
||||
}
|
||||
|
||||
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
||||
this.applicationEventPublisher = applicationEventPublisher;
|
||||
}
|
||||
|
||||
private class WebSocketTcpConnectionInterceptor extends AbstractTcpConnectionInterceptor {
|
||||
|
||||
private volatile boolean shook;
|
||||
@@ -136,6 +145,9 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
|
||||
}
|
||||
this.send(message);
|
||||
}
|
||||
WebSocketEvent event = new WebSocketEvent(this.getTheConnection(),
|
||||
WebSocketEvent.WEBSOCKET_CLOSED, state.getPath(), state.getQueryString());
|
||||
publish(event);
|
||||
this.close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
@@ -199,6 +211,9 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
|
||||
try {
|
||||
doHandshake(payload, message.getHeaders());
|
||||
this.shook = true;
|
||||
WebSocketEvent event = new WebSocketEvent(this.getTheConnection(),
|
||||
WebSocketEvent.HANDSHAKE_COMPLETE, state.getPath(), state.getQueryString());
|
||||
publish(event);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new MessageHandlingException(message, "Handshake failed", e);
|
||||
@@ -207,6 +222,17 @@ public class WebSocketTcpConnectionInterceptorFactory implements TcpConnectionIn
|
||||
return true;
|
||||
}
|
||||
|
||||
private void publish(WebSocketEvent event) {
|
||||
if (WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher != null) {
|
||||
WebSocketTcpConnectionInterceptorFactory.this.applicationEventPublisher.publishEvent(event);
|
||||
}
|
||||
else {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("No publisher available for " + event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void protocolViolation(Message<?> message) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Protocol violation - closing; " + message);
|
||||
|
||||
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.integration.Message;
|
||||
import org.springframework.integration.annotation.Header;
|
||||
@@ -44,7 +45,7 @@ public class WebSocketServerTests {
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
public static class DemoService {
|
||||
public static class DemoService implements ApplicationListener<WebSocketEvent> {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(DemoService.class);
|
||||
|
||||
@@ -92,6 +93,17 @@ public class WebSocketServerTests {
|
||||
logger.warn("Error on write; removing " + connetionId);
|
||||
clients.remove(connetionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(WebSocketEvent event) {
|
||||
logger.info(event);
|
||||
if (WebSocketEvent.HANDSHAKE_COMPLETE.equals(event.getType())) {
|
||||
startStop("start", event.getConnectionId());
|
||||
}
|
||||
else if (WebSocketEvent.WEBSOCKET_CLOSED.equals(event.getType())) {
|
||||
clients.remove(event.getConnectionId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@
|
||||
<center>
|
||||
<form onsubmit="return false;" id="form">
|
||||
<br/><br/>
|
||||
<input type="text" id="message" name="message" value="start" />
|
||||
<input type="text" id="message" name="message" value="stop" />
|
||||
<br/><br/>
|
||||
<input type="button" value="Send To Spring Integration" onclick="sendToSI(this.form.message.value)" align="center"/>
|
||||
<br/><br/>
|
||||
|
||||
Reference in New Issue
Block a user