Consistent SmartLifecycle implementations
Issue: SPR-14233
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2014 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -315,7 +315,7 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(Runnable callback) {
|
public void stop(Runnable callback) {
|
||||||
this.stop();
|
stop();
|
||||||
callback.run();
|
callback.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -272,13 +272,6 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
|
|||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final boolean isRunning() {
|
|
||||||
synchronized (this.lifecycleMonitor) {
|
|
||||||
return this.running;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void start() {
|
public final void start() {
|
||||||
synchronized (this.lifecycleMonitor) {
|
synchronized (this.lifecycleMonitor) {
|
||||||
@@ -303,6 +296,13 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final boolean isRunning() {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
return this.running;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
|
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
|
||||||
ConfigurableBeanFactory beanFactory = (getApplicationContext() instanceof ConfigurableApplicationContext ?
|
ConfigurableBeanFactory beanFactory = (getApplicationContext() instanceof ConfigurableApplicationContext ?
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2015 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -16,8 +16,6 @@
|
|||||||
|
|
||||||
package org.springframework.messaging.simp.user;
|
package org.springframework.messaging.simp.user;
|
||||||
|
|
||||||
import static org.springframework.messaging.simp.SimpMessageHeaderAccessor.*;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@@ -154,13 +152,6 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final boolean isRunning() {
|
|
||||||
synchronized (this.lifecycleMonitor) {
|
|
||||||
return this.running;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void start() {
|
public final void start() {
|
||||||
synchronized (this.lifecycleMonitor) {
|
synchronized (this.lifecycleMonitor) {
|
||||||
@@ -187,6 +178,13 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final boolean isRunning() {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
return this.running;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleMessage(Message<?> message) throws MessagingException {
|
public void handleMessage(Message<?> message) throws MessagingException {
|
||||||
@@ -211,7 +209,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||||||
}
|
}
|
||||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
|
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
|
||||||
initHeaders(accessor);
|
initHeaders(accessor);
|
||||||
accessor.setNativeHeader(ORIGINAL_DESTINATION, result.getSubscribeDestination());
|
accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
|
||||||
accessor.setLeaveMutable(true);
|
accessor.setLeaveMutable(true);
|
||||||
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
|
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
@@ -242,18 +240,15 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||||||
|
|
||||||
private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id");
|
private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id");
|
||||||
|
|
||||||
|
|
||||||
private final MessageSendingOperations<String> messagingTemplate;
|
private final MessageSendingOperations<String> messagingTemplate;
|
||||||
|
|
||||||
private final String broadcastDestination;
|
private final String broadcastDestination;
|
||||||
|
|
||||||
|
|
||||||
public BroadcastHandler(MessageSendingOperations<String> template, String destination) {
|
public BroadcastHandler(MessageSendingOperations<String> template, String destination) {
|
||||||
this.messagingTemplate = template;
|
this.messagingTemplate = template;
|
||||||
this.broadcastDestination = destination;
|
this.broadcastDestination = destination;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public String getBroadcastDestination() {
|
public String getBroadcastDestination() {
|
||||||
return this.broadcastDestination;
|
return this.broadcastDestination;
|
||||||
}
|
}
|
||||||
@@ -263,12 +258,13 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||||||
if (!getBroadcastDestination().equals(destination)) {
|
if (!getBroadcastDestination().equals(destination)) {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
SimpMessageHeaderAccessor accessor = getAccessor(message, SimpMessageHeaderAccessor.class);
|
SimpMessageHeaderAccessor accessor =
|
||||||
|
SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
|
||||||
if (accessor.getSessionId() == null) {
|
if (accessor.getSessionId() == null) {
|
||||||
// Our own broadcast
|
// Our own broadcast
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
destination = accessor.getFirstNativeHeader(ORIGINAL_DESTINATION);
|
destination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Checking unresolved user destination: " + destination);
|
logger.trace("Checking unresolved user destination: " + destination);
|
||||||
}
|
}
|
||||||
@@ -286,13 +282,14 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
|||||||
|
|
||||||
public void handleUnresolved(Message<?> message) {
|
public void handleUnresolved(Message<?> message) {
|
||||||
MessageHeaders headers = message.getHeaders();
|
MessageHeaders headers = message.getHeaders();
|
||||||
if (SimpMessageHeaderAccessor.getFirstNativeHeader(ORIGINAL_DESTINATION, headers) != null) {
|
if (SimpMessageHeaderAccessor.getFirstNativeHeader(
|
||||||
|
SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, headers) != null) {
|
||||||
// Re-broadcast
|
// Re-broadcast
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
|
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
|
||||||
String destination = accessor.getDestination();
|
String destination = accessor.getDestination();
|
||||||
accessor.setNativeHeader(ORIGINAL_DESTINATION, destination);
|
accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, destination);
|
||||||
accessor.setLeaveMutable(true);
|
accessor.setLeaveMutable(true);
|
||||||
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
|
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2012 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -308,7 +308,7 @@ public class GenericMessageEndpointManager implements SmartLifecycle, Initializi
|
|||||||
@Override
|
@Override
|
||||||
public void stop(Runnable callback) {
|
public void stop(Runnable callback) {
|
||||||
synchronized (this.lifecycleMonitor) {
|
synchronized (this.lifecycleMonitor) {
|
||||||
this.stop();
|
stop();
|
||||||
callback.run();
|
callback.run();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2013 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -56,6 +56,10 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected URI getUri() {
|
||||||
|
return this.uri;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set whether to auto-connect to the remote endpoint after this connection manager
|
* Set whether to auto-connect to the remote endpoint after this connection manager
|
||||||
* has been initialized and the Spring context has been refreshed.
|
* has been initialized and the Spring context has been refreshed.
|
||||||
@@ -95,8 +99,61 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
|
|||||||
return this.phase;
|
return this.phase;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected URI getUri() {
|
|
||||||
return this.uri;
|
/**
|
||||||
|
* Start the WebSocket connection. If already connected, the method has no impact.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public final void start() {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
if (!isRunning()) {
|
||||||
|
startInternal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startInternal() {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("Starting " + getClass().getSimpleName());
|
||||||
|
}
|
||||||
|
this.running = true;
|
||||||
|
openConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void stop() {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
if (isRunning()) {
|
||||||
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("Stopping " + getClass().getSimpleName());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
stopInternal();
|
||||||
|
}
|
||||||
|
catch (Throwable ex) {
|
||||||
|
logger.error("Failed to stop WebSocket connection", ex);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
this.running = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void stop(Runnable callback) {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
stop();
|
||||||
|
callback.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void stopInternal() throws Exception {
|
||||||
|
if (isConnected()) {
|
||||||
|
closeConnection();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -109,66 +166,11 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Start the websocket connection. If already connected, the method has no impact.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public final void start() {
|
|
||||||
synchronized (this.lifecycleMonitor) {
|
|
||||||
if (!isRunning()) {
|
|
||||||
startInternal();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void startInternal() {
|
|
||||||
synchronized (lifecycleMonitor) {
|
|
||||||
if (logger.isInfoEnabled()) {
|
|
||||||
logger.info("Starting " + this.getClass().getSimpleName());
|
|
||||||
}
|
|
||||||
this.running = true;
|
|
||||||
openConnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract void openConnection();
|
protected abstract void openConnection();
|
||||||
|
|
||||||
@Override
|
protected abstract void closeConnection() throws Exception;
|
||||||
public final void stop() {
|
|
||||||
synchronized (this.lifecycleMonitor) {
|
|
||||||
if (isRunning()) {
|
|
||||||
if (logger.isInfoEnabled()) {
|
|
||||||
logger.info("Stopping " + this.getClass().getSimpleName());
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
stopInternal();
|
|
||||||
}
|
|
||||||
catch (Throwable e) {
|
|
||||||
logger.error("Failed to stop WebSocket connection", e);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
this.running = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void stopInternal() throws Exception {
|
|
||||||
if (isConnected()) {
|
|
||||||
closeConnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract boolean isConnected();
|
protected abstract boolean isConnected();
|
||||||
|
|
||||||
protected abstract void closeConnection() throws Exception;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public final void stop(Runnable callback) {
|
|
||||||
synchronized (this.lifecycleMonitor) {
|
|
||||||
this.stop();
|
|
||||||
callback.run();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2013 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -47,10 +47,10 @@ public class AnnotatedEndpointConnectionManager extends ConnectionManagerSupport
|
|||||||
|
|
||||||
private WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
|
private WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
|
||||||
|
|
||||||
private Session session;
|
|
||||||
|
|
||||||
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("AnnotatedEndpointConnectionManager-");
|
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("AnnotatedEndpointConnectionManager-");
|
||||||
|
|
||||||
|
private volatile Session session;
|
||||||
|
|
||||||
|
|
||||||
public AnnotatedEndpointConnectionManager(Object endpoint, String uriTemplate, Object... uriVariables) {
|
public AnnotatedEndpointConnectionManager(Object endpoint, String uriTemplate, Object... uriVariables) {
|
||||||
super(uriTemplate, uriVariables);
|
super(uriTemplate, uriVariables);
|
||||||
@@ -96,19 +96,22 @@ public class AnnotatedEndpointConnectionManager extends ConnectionManagerSupport
|
|||||||
return this.taskExecutor;
|
return this.taskExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void openConnection() {
|
protected void openConnection() {
|
||||||
this.taskExecutor.execute(new Runnable() {
|
this.taskExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
logger.info("Connecting to WebSocket at " + getUri());
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("Connecting to WebSocket at " + getUri());
|
||||||
|
}
|
||||||
Object endpointToUse = (endpoint != null) ? endpoint : endpointProvider.getHandler();
|
Object endpointToUse = (endpoint != null) ? endpoint : endpointProvider.getHandler();
|
||||||
session = webSocketContainer.connectToServer(endpointToUse, getUri());
|
session = webSocketContainer.connectToServer(endpointToUse, getUri());
|
||||||
logger.info("Successfully connected");
|
logger.info("Successfully connected to WebSocket");
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
logger.error("Failed to connect", ex);
|
logger.error("Failed to connect to WebSocket", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -128,7 +131,7 @@ public class AnnotatedEndpointConnectionManager extends ConnectionManagerSupport
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isConnected() {
|
protected boolean isConnected() {
|
||||||
return ((this.session != null) && this.session.isOpen());
|
return (this.session != null && this.session.isOpen());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2013 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -28,7 +28,6 @@ import javax.websocket.Extension;
|
|||||||
import javax.websocket.Session;
|
import javax.websocket.Session;
|
||||||
import javax.websocket.WebSocketContainer;
|
import javax.websocket.WebSocketContainer;
|
||||||
|
|
||||||
import org.springframework.beans.BeansException;
|
|
||||||
import org.springframework.beans.factory.BeanFactory;
|
import org.springframework.beans.factory.BeanFactory;
|
||||||
import org.springframework.beans.factory.BeanFactoryAware;
|
import org.springframework.beans.factory.BeanFactoryAware;
|
||||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||||
@@ -57,10 +56,10 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
|||||||
|
|
||||||
private WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
|
private WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
|
||||||
|
|
||||||
private Session session;
|
|
||||||
|
|
||||||
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("EndpointConnectionManager-");
|
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("EndpointConnectionManager-");
|
||||||
|
|
||||||
|
private volatile Session session;
|
||||||
|
|
||||||
|
|
||||||
public EndpointConnectionManager(Endpoint endpoint, String uriTemplate, Object... uriVariables) {
|
public EndpointConnectionManager(Endpoint endpoint, String uriTemplate, Object... uriVariables) {
|
||||||
super(uriTemplate, uriVariables);
|
super(uriTemplate, uriVariables);
|
||||||
@@ -106,7 +105,7 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
public void setBeanFactory(BeanFactory beanFactory) {
|
||||||
if (this.endpointProvider != null) {
|
if (this.endpointProvider != null) {
|
||||||
this.endpointProvider.setBeanFactory(beanFactory);
|
this.endpointProvider.setBeanFactory(beanFactory);
|
||||||
}
|
}
|
||||||
@@ -135,14 +134,16 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
logger.info("Connecting to WebSocket at " + getUri());
|
if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("Connecting to WebSocket at " + getUri());
|
||||||
|
}
|
||||||
Endpoint endpointToUse = (endpoint != null) ? endpoint : endpointProvider.getHandler();
|
Endpoint endpointToUse = (endpoint != null) ? endpoint : endpointProvider.getHandler();
|
||||||
ClientEndpointConfig endpointConfig = configBuilder.build();
|
ClientEndpointConfig endpointConfig = configBuilder.build();
|
||||||
session = getWebSocketContainer().connectToServer(endpointToUse, endpointConfig, getUri());
|
session = getWebSocketContainer().connectToServer(endpointToUse, endpointConfig, getUri());
|
||||||
logger.info("Successfully connected");
|
logger.info("Successfully connected to WebSocket");
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
logger.error("Failed to connect", ex);
|
logger.error("Failed to connect to WebSocket", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -162,7 +163,7 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isConnected() {
|
protected boolean isConnected() {
|
||||||
return ((this.session != null) && this.session.isOpen());
|
return (this.session != null && this.session.isOpen());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2015 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -239,13 +239,6 @@ public class SubProtocolWebSocketHandler
|
|||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final boolean isRunning() {
|
|
||||||
synchronized (this.lifecycleMonitor) {
|
|
||||||
return this.running;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void start() {
|
public final void start() {
|
||||||
Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers");
|
Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers");
|
||||||
@@ -281,6 +274,13 @@ public class SubProtocolWebSocketHandler
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final boolean isRunning() {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
return this.running;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2015 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -91,7 +91,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
* @param webSocketClient the WebSocket client to connect with
|
* @param webSocketClient the WebSocket client to connect with
|
||||||
*/
|
*/
|
||||||
public WebSocketStompClient(WebSocketClient webSocketClient) {
|
public WebSocketStompClient(WebSocketClient webSocketClient) {
|
||||||
Assert.notNull(webSocketClient, "'webSocketClient' is required.");
|
Assert.notNull(webSocketClient, "WebSocketClient is required");
|
||||||
this.webSocketClient = webSocketClient;
|
this.webSocketClient = webSocketClient;
|
||||||
setDefaultHeartbeat(new long[] {0, 0});
|
setDefaultHeartbeat(new long[] {0, 0});
|
||||||
}
|
}
|
||||||
@@ -153,11 +153,6 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
return this.autoStartup;
|
return this.autoStartup;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRunning() {
|
|
||||||
return this.running;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specify the phase in which the WebSocket client should be started and
|
* Specify the phase in which the WebSocket client should be started and
|
||||||
* subsequently closed. The startup order proceeds from lowest to highest,
|
* subsequently closed. The startup order proceeds from lowest to highest,
|
||||||
@@ -201,10 +196,16 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(Runnable callback) {
|
public void stop(Runnable callback) {
|
||||||
this.stop();
|
stop();
|
||||||
callback.run();
|
callback.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return this.running;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to the given WebSocket URL and notify the given
|
* Connect to the given WebSocket URL and notify the given
|
||||||
* {@link org.springframework.messaging.simp.stomp.StompSessionHandler}
|
* {@link org.springframework.messaging.simp.stomp.StompSessionHandler}
|
||||||
@@ -249,7 +250,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
public ListenableFuture<StompSession> connect(String url, WebSocketHttpHeaders handshakeHeaders,
|
public ListenableFuture<StompSession> connect(String url, WebSocketHttpHeaders handshakeHeaders,
|
||||||
StompHeaders connectHeaders, StompSessionHandler handler, Object... uriVariables) {
|
StompHeaders connectHeaders, StompSessionHandler handler, Object... uriVariables) {
|
||||||
|
|
||||||
Assert.notNull(url, "uriTemplate must not be null");
|
Assert.notNull(url, "'url' must not be null");
|
||||||
URI uri = UriComponentsBuilder.fromUriString(url).buildAndExpand(uriVariables).encode().toUri();
|
URI uri = UriComponentsBuilder.fromUriString(url).buildAndExpand(uriVariables).encode().toUri();
|
||||||
return connect(uri, handshakeHeaders, connectHeaders, handler);
|
return connect(uri, handshakeHeaders, connectHeaders, handler);
|
||||||
}
|
}
|
||||||
@@ -267,7 +268,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
public ListenableFuture<StompSession> connect(URI url, WebSocketHttpHeaders handshakeHeaders,
|
public ListenableFuture<StompSession> connect(URI url, WebSocketHttpHeaders handshakeHeaders,
|
||||||
StompHeaders connectHeaders, StompSessionHandler sessionHandler) {
|
StompHeaders connectHeaders, StompSessionHandler sessionHandler) {
|
||||||
|
|
||||||
Assert.notNull(url, "'uri' must not be null");
|
Assert.notNull(url, "'url' must not be null");
|
||||||
ConnectionHandlingStompSession session = createSession(connectHeaders, sessionHandler);
|
ConnectionHandlingStompSession session = createSession(connectHeaders, sessionHandler);
|
||||||
WebSocketTcpConnectionHandlerAdapter adapter = new WebSocketTcpConnectionHandlerAdapter(session);
|
WebSocketTcpConnectionHandlerAdapter adapter = new WebSocketTcpConnectionHandlerAdapter(session);
|
||||||
getWebSocketClient().doHandshake(adapter, handshakeHeaders, url).addCallback(adapter);
|
getWebSocketClient().doHandshake(adapter, handshakeHeaders, url).addCallback(adapter);
|
||||||
@@ -278,7 +279,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
protected StompHeaders processConnectHeaders(StompHeaders connectHeaders) {
|
protected StompHeaders processConnectHeaders(StompHeaders connectHeaders) {
|
||||||
connectHeaders = super.processConnectHeaders(connectHeaders);
|
connectHeaders = super.processConnectHeaders(connectHeaders);
|
||||||
if (connectHeaders.isHeartbeatEnabled()) {
|
if (connectHeaders.isHeartbeatEnabled()) {
|
||||||
Assert.notNull(getTaskScheduler(), "TaskScheduler cannot be null if heartbeats are enabled.");
|
Assert.state(getTaskScheduler() != null, "TaskScheduler must be set if heartbeats are enabled");
|
||||||
}
|
}
|
||||||
return connectHeaders;
|
return connectHeaders;
|
||||||
}
|
}
|
||||||
@@ -303,7 +304,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
private final List<ScheduledFuture<?>> inactivityTasks = new ArrayList<ScheduledFuture<?>>(2);
|
private final List<ScheduledFuture<?>> inactivityTasks = new ArrayList<ScheduledFuture<?>>(2);
|
||||||
|
|
||||||
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> connectionHandler) {
|
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> connectionHandler) {
|
||||||
Assert.notNull(connectionHandler);
|
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
|
||||||
this.connectionHandler = connectionHandler;
|
this.connectionHandler = connectionHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -397,7 +398,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReadInactivity(final Runnable runnable, final long duration) {
|
public void onReadInactivity(final Runnable runnable, final long duration) {
|
||||||
Assert.notNull(getTaskScheduler(), "No scheduler configured.");
|
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
|
||||||
this.lastReadTime = System.currentTimeMillis();
|
this.lastReadTime = System.currentTimeMillis();
|
||||||
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(new Runnable() {
|
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
@@ -418,7 +419,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onWriteInactivity(final Runnable runnable, final long duration) {
|
public void onWriteInactivity(final Runnable runnable, final long duration) {
|
||||||
Assert.notNull(getTaskScheduler(), "No scheduler configured.");
|
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
|
||||||
this.lastWriteTime = System.currentTimeMillis();
|
this.lastWriteTime = System.currentTimeMillis();
|
||||||
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(new Runnable() {
|
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
@@ -491,7 +492,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
|||||||
|
|
||||||
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
|
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
|
||||||
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
||||||
Assert.notNull(accessor);
|
Assert.notNull(accessor, "No StompHeaderAccessor available");
|
||||||
byte[] payload = message.getPayload();
|
byte[] payload = message.getPayload();
|
||||||
byte[] bytes = ENCODER.encode(accessor.getMessageHeaders(), payload);
|
byte[] bytes = ENCODER.encode(accessor.getMessageHeaders(), payload);
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2015 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -51,11 +51,6 @@ public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping implements
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRunning() {
|
|
||||||
return this.running;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getPhase() {
|
public int getPhase() {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
@@ -91,4 +86,9 @@ public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping implements
|
|||||||
callback.run();
|
callback.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return this.running;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright 2002-2015 the original author or authors.
|
* Copyright 2002-2016 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -16,16 +16,12 @@
|
|||||||
|
|
||||||
package org.springframework.web.socket.messaging;
|
package org.springframework.web.socket.messaging;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.MockitoAnnotations;
|
import org.mockito.MockitoAnnotations;
|
||||||
@@ -50,6 +46,9 @@ import org.springframework.web.socket.WebSocketHandler;
|
|||||||
import org.springframework.web.socket.WebSocketSession;
|
import org.springframework.web.socket.WebSocketSession;
|
||||||
import org.springframework.web.socket.client.WebSocketClient;
|
import org.springframework.web.socket.client.WebSocketClient;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link WebSocketStompClient}.
|
* Unit tests for {@link WebSocketStompClient}.
|
||||||
*
|
*
|
||||||
@@ -91,6 +90,7 @@ public class WebSocketStompClientTests {
|
|||||||
.thenReturn(this.handshakeFuture);
|
.thenReturn(this.handshakeFuture);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void webSocketHandshakeFailure() throws Exception {
|
public void webSocketHandshakeFailure() throws Exception {
|
||||||
connect();
|
connect();
|
||||||
@@ -246,9 +246,9 @@ public class WebSocketStompClientTests {
|
|||||||
stompClient.setDefaultHeartbeat(new long[] {5, 5});
|
stompClient.setDefaultHeartbeat(new long[] {5, 5});
|
||||||
try {
|
try {
|
||||||
stompClient.processConnectHeaders(null);
|
stompClient.processConnectHeaders(null);
|
||||||
fail("Expected exception");
|
fail("Expected IllegalStateException");
|
||||||
}
|
}
|
||||||
catch (IllegalArgumentException ex) {
|
catch (IllegalStateException ex) {
|
||||||
// Ignore
|
// Ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -308,7 +308,6 @@ public class WebSocketStompClientTests {
|
|||||||
|
|
||||||
|
|
||||||
private WebSocketHandler connect() {
|
private WebSocketHandler connect() {
|
||||||
|
|
||||||
this.stompClient.connect("/foo", mock(StompSessionHandler.class));
|
this.stompClient.connect("/foo", mock(StompSessionHandler.class));
|
||||||
|
|
||||||
verify(this.stompSession).getSessionFuture();
|
verify(this.stompSession).getSessionFuture();
|
||||||
@@ -354,7 +353,6 @@ public class WebSocketStompClientTests {
|
|||||||
|
|
||||||
private ConnectionHandlingStompSession stompSession;
|
private ConnectionHandlingStompSession stompSession;
|
||||||
|
|
||||||
|
|
||||||
public TestWebSocketStompClient(WebSocketClient webSocketClient) {
|
public TestWebSocketStompClient(WebSocketClient webSocketClient) {
|
||||||
super(webSocketClient);
|
super(webSocketClient);
|
||||||
}
|
}
|
||||||
@@ -369,4 +367,4 @@ public class WebSocketStompClientTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user