Consistent SmartLifecycle implementations
Issue: SPR-14233
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -315,7 +315,7 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
this.stop();
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
|
||||
@@ -272,13 +272,6 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isRunning() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return this.running;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void start() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
@@ -303,6 +296,13 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isRunning() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return this.running;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
|
||||
ConfigurableBeanFactory beanFactory = (getApplicationContext() instanceof ConfigurableApplicationContext ?
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,8 +16,6 @@
|
||||
|
||||
package org.springframework.messaging.simp.user;
|
||||
|
||||
import static org.springframework.messaging.simp.SimpMessageHeaderAccessor.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@@ -154,13 +152,6 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isRunning() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return this.running;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void start() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
@@ -187,6 +178,13 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isRunning() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return this.running;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
@@ -211,7 +209,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
||||
}
|
||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
|
||||
initHeaders(accessor);
|
||||
accessor.setNativeHeader(ORIGINAL_DESTINATION, result.getSubscribeDestination());
|
||||
accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
|
||||
accessor.setLeaveMutable(true);
|
||||
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
|
||||
if (logger.isTraceEnabled()) {
|
||||
@@ -242,18 +240,15 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
||||
|
||||
private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id");
|
||||
|
||||
|
||||
private final MessageSendingOperations<String> messagingTemplate;
|
||||
|
||||
private final String broadcastDestination;
|
||||
|
||||
|
||||
public BroadcastHandler(MessageSendingOperations<String> template, String destination) {
|
||||
this.messagingTemplate = template;
|
||||
this.broadcastDestination = destination;
|
||||
}
|
||||
|
||||
|
||||
public String getBroadcastDestination() {
|
||||
return this.broadcastDestination;
|
||||
}
|
||||
@@ -263,12 +258,13 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
||||
if (!getBroadcastDestination().equals(destination)) {
|
||||
return message;
|
||||
}
|
||||
SimpMessageHeaderAccessor accessor = getAccessor(message, SimpMessageHeaderAccessor.class);
|
||||
SimpMessageHeaderAccessor accessor =
|
||||
SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
|
||||
if (accessor.getSessionId() == null) {
|
||||
// Our own broadcast
|
||||
return null;
|
||||
}
|
||||
destination = accessor.getFirstNativeHeader(ORIGINAL_DESTINATION);
|
||||
destination = accessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Checking unresolved user destination: " + destination);
|
||||
}
|
||||
@@ -286,13 +282,14 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
||||
|
||||
public void handleUnresolved(Message<?> message) {
|
||||
MessageHeaders headers = message.getHeaders();
|
||||
if (SimpMessageHeaderAccessor.getFirstNativeHeader(ORIGINAL_DESTINATION, headers) != null) {
|
||||
if (SimpMessageHeaderAccessor.getFirstNativeHeader(
|
||||
SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, headers) != null) {
|
||||
// Re-broadcast
|
||||
return;
|
||||
}
|
||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
|
||||
String destination = accessor.getDestination();
|
||||
accessor.setNativeHeader(ORIGINAL_DESTINATION, destination);
|
||||
accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, destination);
|
||||
accessor.setLeaveMutable(true);
|
||||
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2012 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -308,7 +308,7 @@ public class GenericMessageEndpointManager implements SmartLifecycle, Initializi
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
this.stop();
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -56,6 +56,10 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
|
||||
}
|
||||
|
||||
|
||||
protected URI getUri() {
|
||||
return this.uri;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set whether to auto-connect to the remote endpoint after this connection manager
|
||||
* has been initialized and the Spring context has been refreshed.
|
||||
@@ -95,8 +99,61 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
|
||||
return this.phase;
|
||||
}
|
||||
|
||||
protected URI getUri() {
|
||||
return this.uri;
|
||||
|
||||
/**
|
||||
* Start the WebSocket connection. If already connected, the method has no impact.
|
||||
*/
|
||||
@Override
|
||||
public final void start() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
if (!isRunning()) {
|
||||
startInternal();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void startInternal() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Starting " + getClass().getSimpleName());
|
||||
}
|
||||
this.running = true;
|
||||
openConnection();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void stop() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
if (isRunning()) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Stopping " + getClass().getSimpleName());
|
||||
}
|
||||
try {
|
||||
stopInternal();
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Failed to stop WebSocket connection", ex);
|
||||
}
|
||||
finally {
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void stop(Runnable callback) {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
}
|
||||
|
||||
protected void stopInternal() throws Exception {
|
||||
if (isConnected()) {
|
||||
closeConnection();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -109,66 +166,11 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the websocket connection. If already connected, the method has no impact.
|
||||
*/
|
||||
@Override
|
||||
public final void start() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
if (!isRunning()) {
|
||||
startInternal();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void startInternal() {
|
||||
synchronized (lifecycleMonitor) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Starting " + this.getClass().getSimpleName());
|
||||
}
|
||||
this.running = true;
|
||||
openConnection();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void openConnection();
|
||||
|
||||
@Override
|
||||
public final void stop() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
if (isRunning()) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Stopping " + this.getClass().getSimpleName());
|
||||
}
|
||||
try {
|
||||
stopInternal();
|
||||
}
|
||||
catch (Throwable e) {
|
||||
logger.error("Failed to stop WebSocket connection", e);
|
||||
}
|
||||
finally {
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void stopInternal() throws Exception {
|
||||
if (isConnected()) {
|
||||
closeConnection();
|
||||
}
|
||||
}
|
||||
protected abstract void closeConnection() throws Exception;
|
||||
|
||||
protected abstract boolean isConnected();
|
||||
|
||||
protected abstract void closeConnection() throws Exception;
|
||||
|
||||
@Override
|
||||
public final void stop(Runnable callback) {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
this.stop();
|
||||
callback.run();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -47,10 +47,10 @@ public class AnnotatedEndpointConnectionManager extends ConnectionManagerSupport
|
||||
|
||||
private WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
|
||||
|
||||
private Session session;
|
||||
|
||||
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("AnnotatedEndpointConnectionManager-");
|
||||
|
||||
private volatile Session session;
|
||||
|
||||
|
||||
public AnnotatedEndpointConnectionManager(Object endpoint, String uriTemplate, Object... uriVariables) {
|
||||
super(uriTemplate, uriVariables);
|
||||
@@ -96,19 +96,22 @@ public class AnnotatedEndpointConnectionManager extends ConnectionManagerSupport
|
||||
return this.taskExecutor;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void openConnection() {
|
||||
this.taskExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.info("Connecting to WebSocket at " + getUri());
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Connecting to WebSocket at " + getUri());
|
||||
}
|
||||
Object endpointToUse = (endpoint != null) ? endpoint : endpointProvider.getHandler();
|
||||
session = webSocketContainer.connectToServer(endpointToUse, getUri());
|
||||
logger.info("Successfully connected");
|
||||
logger.info("Successfully connected to WebSocket");
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Failed to connect", ex);
|
||||
logger.error("Failed to connect to WebSocket", ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -128,7 +131,7 @@ public class AnnotatedEndpointConnectionManager extends ConnectionManagerSupport
|
||||
|
||||
@Override
|
||||
protected boolean isConnected() {
|
||||
return ((this.session != null) && this.session.isOpen());
|
||||
return (this.session != null && this.session.isOpen());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -28,7 +28,6 @@ import javax.websocket.Extension;
|
||||
import javax.websocket.Session;
|
||||
import javax.websocket.WebSocketContainer;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
@@ -57,10 +56,10 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
||||
|
||||
private WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
|
||||
|
||||
private Session session;
|
||||
|
||||
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("EndpointConnectionManager-");
|
||||
|
||||
private volatile Session session;
|
||||
|
||||
|
||||
public EndpointConnectionManager(Endpoint endpoint, String uriTemplate, Object... uriVariables) {
|
||||
super(uriTemplate, uriVariables);
|
||||
@@ -106,7 +105,7 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
||||
public void setBeanFactory(BeanFactory beanFactory) {
|
||||
if (this.endpointProvider != null) {
|
||||
this.endpointProvider.setBeanFactory(beanFactory);
|
||||
}
|
||||
@@ -135,14 +134,16 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.info("Connecting to WebSocket at " + getUri());
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Connecting to WebSocket at " + getUri());
|
||||
}
|
||||
Endpoint endpointToUse = (endpoint != null) ? endpoint : endpointProvider.getHandler();
|
||||
ClientEndpointConfig endpointConfig = configBuilder.build();
|
||||
session = getWebSocketContainer().connectToServer(endpointToUse, endpointConfig, getUri());
|
||||
logger.info("Successfully connected");
|
||||
logger.info("Successfully connected to WebSocket");
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Failed to connect", ex);
|
||||
logger.error("Failed to connect to WebSocket", ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -162,7 +163,7 @@ public class EndpointConnectionManager extends ConnectionManagerSupport implemen
|
||||
|
||||
@Override
|
||||
protected boolean isConnected() {
|
||||
return ((this.session != null) && this.session.isOpen());
|
||||
return (this.session != null && this.session.isOpen());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -239,13 +239,6 @@ public class SubProtocolWebSocketHandler
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isRunning() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return this.running;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void start() {
|
||||
Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers");
|
||||
@@ -281,6 +274,13 @@ public class SubProtocolWebSocketHandler
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isRunning() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return this.running;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -91,7 +91,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
* @param webSocketClient the WebSocket client to connect with
|
||||
*/
|
||||
public WebSocketStompClient(WebSocketClient webSocketClient) {
|
||||
Assert.notNull(webSocketClient, "'webSocketClient' is required.");
|
||||
Assert.notNull(webSocketClient, "WebSocketClient is required");
|
||||
this.webSocketClient = webSocketClient;
|
||||
setDefaultHeartbeat(new long[] {0, 0});
|
||||
}
|
||||
@@ -153,11 +153,6 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
return this.autoStartup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.running;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the phase in which the WebSocket client should be started and
|
||||
* subsequently closed. The startup order proceeds from lowest to highest,
|
||||
@@ -201,10 +196,16 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
this.stop();
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.running;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Connect to the given WebSocket URL and notify the given
|
||||
* {@link org.springframework.messaging.simp.stomp.StompSessionHandler}
|
||||
@@ -249,7 +250,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
public ListenableFuture<StompSession> connect(String url, WebSocketHttpHeaders handshakeHeaders,
|
||||
StompHeaders connectHeaders, StompSessionHandler handler, Object... uriVariables) {
|
||||
|
||||
Assert.notNull(url, "uriTemplate must not be null");
|
||||
Assert.notNull(url, "'url' must not be null");
|
||||
URI uri = UriComponentsBuilder.fromUriString(url).buildAndExpand(uriVariables).encode().toUri();
|
||||
return connect(uri, handshakeHeaders, connectHeaders, handler);
|
||||
}
|
||||
@@ -267,7 +268,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
public ListenableFuture<StompSession> connect(URI url, WebSocketHttpHeaders handshakeHeaders,
|
||||
StompHeaders connectHeaders, StompSessionHandler sessionHandler) {
|
||||
|
||||
Assert.notNull(url, "'uri' must not be null");
|
||||
Assert.notNull(url, "'url' must not be null");
|
||||
ConnectionHandlingStompSession session = createSession(connectHeaders, sessionHandler);
|
||||
WebSocketTcpConnectionHandlerAdapter adapter = new WebSocketTcpConnectionHandlerAdapter(session);
|
||||
getWebSocketClient().doHandshake(adapter, handshakeHeaders, url).addCallback(adapter);
|
||||
@@ -278,7 +279,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
protected StompHeaders processConnectHeaders(StompHeaders connectHeaders) {
|
||||
connectHeaders = super.processConnectHeaders(connectHeaders);
|
||||
if (connectHeaders.isHeartbeatEnabled()) {
|
||||
Assert.notNull(getTaskScheduler(), "TaskScheduler cannot be null if heartbeats are enabled.");
|
||||
Assert.state(getTaskScheduler() != null, "TaskScheduler must be set if heartbeats are enabled");
|
||||
}
|
||||
return connectHeaders;
|
||||
}
|
||||
@@ -303,7 +304,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
private final List<ScheduledFuture<?>> inactivityTasks = new ArrayList<ScheduledFuture<?>>(2);
|
||||
|
||||
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> connectionHandler) {
|
||||
Assert.notNull(connectionHandler);
|
||||
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
|
||||
this.connectionHandler = connectionHandler;
|
||||
}
|
||||
|
||||
@@ -397,7 +398,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
|
||||
@Override
|
||||
public void onReadInactivity(final Runnable runnable, final long duration) {
|
||||
Assert.notNull(getTaskScheduler(), "No scheduler configured.");
|
||||
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
|
||||
this.lastReadTime = System.currentTimeMillis();
|
||||
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
@@ -418,7 +419,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
|
||||
@Override
|
||||
public void onWriteInactivity(final Runnable runnable, final long duration) {
|
||||
Assert.notNull(getTaskScheduler(), "No scheduler configured.");
|
||||
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
|
||||
this.lastWriteTime = System.currentTimeMillis();
|
||||
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
@@ -491,7 +492,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
|
||||
|
||||
public WebSocketMessage<?> encode(Message<byte[]> message, Class<? extends WebSocketSession> sessionType) {
|
||||
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
|
||||
Assert.notNull(accessor);
|
||||
Assert.notNull(accessor, "No StompHeaderAccessor available");
|
||||
byte[] payload = message.getPayload();
|
||||
byte[] bytes = ENCODER.encode(accessor.getMessageHeaders(), payload);
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -51,11 +51,6 @@ public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping implements
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return Integer.MAX_VALUE;
|
||||
@@ -91,4 +86,9 @@ public class WebSocketHandlerMapping extends SimpleUrlHandlerMapping implements
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.running;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,16 +16,12 @@
|
||||
|
||||
package org.springframework.web.socket.messaging;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
@@ -50,6 +46,9 @@ import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.client.WebSocketClient;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link WebSocketStompClient}.
|
||||
*
|
||||
@@ -91,6 +90,7 @@ public class WebSocketStompClientTests {
|
||||
.thenReturn(this.handshakeFuture);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void webSocketHandshakeFailure() throws Exception {
|
||||
connect();
|
||||
@@ -246,9 +246,9 @@ public class WebSocketStompClientTests {
|
||||
stompClient.setDefaultHeartbeat(new long[] {5, 5});
|
||||
try {
|
||||
stompClient.processConnectHeaders(null);
|
||||
fail("Expected exception");
|
||||
fail("Expected IllegalStateException");
|
||||
}
|
||||
catch (IllegalArgumentException ex) {
|
||||
catch (IllegalStateException ex) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
@@ -308,7 +308,6 @@ public class WebSocketStompClientTests {
|
||||
|
||||
|
||||
private WebSocketHandler connect() {
|
||||
|
||||
this.stompClient.connect("/foo", mock(StompSessionHandler.class));
|
||||
|
||||
verify(this.stompSession).getSessionFuture();
|
||||
@@ -354,7 +353,6 @@ public class WebSocketStompClientTests {
|
||||
|
||||
private ConnectionHandlingStompSession stompSession;
|
||||
|
||||
|
||||
public TestWebSocketStompClient(WebSocketClient webSocketClient) {
|
||||
super(webSocketClient);
|
||||
}
|
||||
@@ -369,4 +367,4 @@ public class WebSocketStompClientTests {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user