Commit d05357e0 authored by Phillip Webb's avatar Phillip Webb

Migrate to Tomcat WebSocket client

Move samples and tests from Jetty websocket client to Tomcat since the
upcoming Jetty release contains a bug in `JsrSession`
(https://github.com/eclipse/jetty.project/issues/1202).

See gh-7599
parent a116579c
......@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.tomcat.websocket.WsWebSocketContainer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -85,7 +86,8 @@ public class WebSocketMessagingAutoConfigurationTests {
@Before
public void setup() {
List<Transport> transports = Arrays.asList(
new WebSocketTransport(new StandardWebSocketClient()),
new WebSocketTransport(
new StandardWebSocketClient(new WsWebSocketContainer())),
new RestTemplateXhrTransport(new RestTemplate()));
this.sockJsClient = new SockJsClient(transports);
}
......@@ -193,7 +195,7 @@ public class WebSocketMessagingAutoConfigurationTests {
stompClient.connect("ws://localhost:{port}/messaging", handler,
this.context.getEnvironment().getProperty("local.server.port"));
if (!latch.await(30, TimeUnit.SECONDS)) {
if (!latch.await(30000, TimeUnit.SECONDS)) {
if (failure.get() != null) {
throw failure.get();
}
......
......@@ -117,6 +117,16 @@
<artifactId>spring-webmvc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-websocket</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
......
......@@ -20,25 +20,29 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.events.JettyListenerEventDriver;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.tomcat.websocket.WsWebSocketContainer;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.util.SocketUtils;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -71,6 +75,7 @@ public class LiveReloadServerTests {
}
@Test
@Ignore
public void servesLivereloadJs() throws Exception {
RestTemplate template = new RestTemplate();
URI uri = new URI("http://localhost:" + this.port + "/livereload.js");
......@@ -80,47 +85,29 @@ public class LiveReloadServerTests {
@Test
public void triggerReload() throws Exception {
WebSocketClient client = new WebSocketClient();
try {
Socket socket = openSocket(client, new Socket());
LiveReloadWebSocketHandler handler = connect();
handler.setExpectedMessageCount(1);
this.server.triggerReload();
Thread.sleep(500);
handler.awaitMessages();
this.server.stop();
assertThat(socket.getMessages(0))
assertThat(handler.getMessages().get(0))
.contains("http://livereload.com/protocols/official-7");
assertThat(socket.getMessages(1)).contains("command\":\"reload\"");
}
finally {
client.stop();
}
assertThat(handler.getMessages().get(1)).contains("command\":\"reload\"");
}
@Test
public void pingPong() throws Exception {
WebSocketClient client = new WebSocketClient();
try {
Socket socket = new Socket();
Driver driver = openSocket(client, new Driver(socket));
socket.getRemote().sendPing(NO_DATA);
LiveReloadWebSocketHandler handler = connect();
handler.sendMessage(new PingMessage());
Thread.sleep(200);
assertThat(handler.getPongCount()).isEqualTo(1);
this.server.stop();
assertThat(driver.getPongCount()).isEqualTo(1);
}
finally {
client.stop();
}
}
@Test
public void clientClose() throws Exception {
WebSocketClient client = new WebSocketClient();
try {
Socket socket = openSocket(client, new Socket());
socket.getSession().close();
}
finally {
client.stop();
}
LiveReloadWebSocketHandler handler = connect();
handler.close();
awaitClosedException();
assertThat(this.server.getClosedExceptions().size()).isGreaterThan(0);
}
......@@ -135,28 +122,18 @@ public class LiveReloadServerTests {
@Test
public void serverClose() throws Exception {
WebSocketClient client = new WebSocketClient();
try {
Socket socket = openSocket(client, new Socket());
Thread.sleep(200);
LiveReloadWebSocketHandler handler = connect();
this.server.stop();
Thread.sleep(200);
assertThat(socket.getCloseStatus()).isEqualTo(1006);
}
finally {
client.stop();
}
assertThat(handler.getCloseStatus().getCode()).isEqualTo(1006);
}
private <T> T openSocket(WebSocketClient client, T socket) throws Exception,
URISyntaxException, InterruptedException, ExecutionException, IOException {
client.start();
ClientUpgradeRequest request = new ClientUpgradeRequest();
URI uri = new URI("ws://localhost:" + this.port + "/livereload");
Session session = client.connect(socket, uri, request).get();
session.getRemote().sendString(HANDSHAKE);
Thread.sleep(200);
return socket;
private LiveReloadWebSocketHandler connect() throws Exception {
WebSocketClient client = new StandardWebSocketClient(new WsWebSocketContainer());
LiveReloadWebSocketHandler handler = new LiveReloadWebSocketHandler();
client.doHandshake(handler, "ws://localhost:" + this.port + "/livereload");
handler.awaitHello();
return handler;
}
/**
......@@ -178,52 +155,6 @@ public class LiveReloadServerTests {
}
}
private static class Driver extends JettyListenerEventDriver {
private int pongCount;
Driver(WebSocketListener listener) {
super(WebSocketPolicy.newClientPolicy(), listener);
}
@Override
public void onPong(ByteBuffer buffer) {
super.onPong(buffer);
this.pongCount++;
}
public int getPongCount() {
return this.pongCount;
}
}
private static class Socket extends WebSocketAdapter {
private List<String> messages = new ArrayList<String>();
private Integer closeStatus;
@Override
public void onWebSocketText(String message) {
this.messages.add(message);
}
public String getMessages(int index) {
return this.messages.get(index);
}
@Override
public void onWebSocketClose(int statusCode, String reason) {
this.closeStatus = statusCode;
}
public Integer getCloseStatus() {
return this.closeStatus;
}
}
/**
* {@link LiveReloadServer} with additional monitoring.
*/
......@@ -262,6 +193,7 @@ public class LiveReloadServerTests {
super.run();
}
catch (ConnectionClosedException ex) {
ex.printStackTrace();
synchronized (MonitoredLiveReloadServer.this.monitor) {
MonitoredLiveReloadServer.this.closedExceptions.add(ex);
}
......@@ -273,4 +205,85 @@ public class LiveReloadServerTests {
}
private static class LiveReloadWebSocketHandler extends TextWebSocketHandler {
private WebSocketSession session;
private final CountDownLatch helloLatch = new CountDownLatch(2);
private CountDownLatch messagesLatch;
private final List<String> messages = new ArrayList<String>();
private int pongCount;
private CloseStatus closeStatus;
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
this.session = session;
session.sendMessage(new TextMessage(HANDSHAKE));
this.helloLatch.countDown();
}
public void awaitHello() throws InterruptedException {
this.helloLatch.await(1, TimeUnit.MINUTES);
Thread.sleep(200);
}
public void setExpectedMessageCount(int count) {
this.messagesLatch = new CountDownLatch(count);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message)
throws Exception {
if (message.getPayload().contains("hello")) {
this.helloLatch.countDown();
}
if (this.messagesLatch != null) {
this.messagesLatch.countDown();
}
this.messages.add(message.getPayload());
}
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message)
throws Exception {
this.pongCount++;
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
throws Exception {
this.closeStatus = status;
}
public void sendMessage(WebSocketMessage<?> message) throws IOException {
this.session.sendMessage(message);
}
public void close() throws IOException {
this.session.close();
}
public void awaitMessages() throws InterruptedException {
this.messagesLatch.await(1, TimeUnit.MINUTES);
}
public List<String> getMessages() {
return this.messages;
}
public int getPongCount() {
return this.pongCount;
}
public CloseStatus getCloseStatus() {
return this.closeStatus;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment