From fcfacd9f83015d41a80abbe0cfd820953564d56d Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Mon, 23 Jan 2017 21:28:40 +0100 Subject: [PATCH] Polishing (cherry picked from commit f095aa2) --- .../cache/concurrent/ConcurrentMapCache.java | 13 +- ...lVariableTableParameterNameDiscoverer.java | 2 +- .../PrioritizedParameterNameDiscoverer.java | 15 +- ...dardReflectionParameterNameDiscoverer.java | 3 +- .../converter/ConvertingComparator.java | 6 +- .../util/InvalidMimeTypeException.java | 5 +- .../util/comparator/CompoundComparator.java | 6 +- .../config/AbstractJmsListenerEndpoint.java | 8 +- .../config/JmsListenerEndpointRegistrar.java | 5 +- .../jms/config/MethodJmsListenerEndpoint.java | 7 +- .../jms/core/BrowserCallback.java | 14 +- .../jms/core/ProducerCallback.java | 10 +- .../jms/core/SessionCallback.java | 16 +- .../stomp/StompBrokerRelayMessageHandler.java | 159 +++++++++--------- .../adapter/AbstractWebSocketSession.java | 3 +- .../client/AbstractClientSockJsSession.java | 112 ++++++------ .../web/socket/sockjs/frame/SockJsFrame.java | 137 +++++++-------- 17 files changed, 259 insertions(+), 262 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java b/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java index 7a762db541..a8f151652b 100644 --- a/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java +++ b/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -186,9 +186,9 @@ public class ConcurrentMapCache extends AbstractValueAdaptingCache { try { return serializeValue(storeValue); } - catch (Exception ex) { - throw new IllegalArgumentException("Failed to serialize cache value '" - + userValue + "'. Does it implement Serializable?", ex); + catch (Throwable ex) { + throw new IllegalArgumentException("Failed to serialize cache value '" + userValue + + "'. Does it implement Serializable?", ex); } } else { @@ -213,9 +213,8 @@ public class ConcurrentMapCache extends AbstractValueAdaptingCache { try { return super.fromStoreValue(deserializeValue(storeValue)); } - catch (Exception ex) { - throw new IllegalArgumentException("Failed to deserialize cache value '" + - storeValue + "'", ex); + catch (Throwable ex) { + throw new IllegalArgumentException("Failed to deserialize cache value '" + storeValue + "'", ex); } } else { diff --git a/spring-core/src/main/java/org/springframework/core/LocalVariableTableParameterNameDiscoverer.java b/spring-core/src/main/java/org/springframework/core/LocalVariableTableParameterNameDiscoverer.java index cc09fd80d3..b9e5a6bdee 100644 --- a/spring-core/src/main/java/org/springframework/core/LocalVariableTableParameterNameDiscoverer.java +++ b/spring-core/src/main/java/org/springframework/core/LocalVariableTableParameterNameDiscoverer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-core/src/main/java/org/springframework/core/PrioritizedParameterNameDiscoverer.java b/spring-core/src/main/java/org/springframework/core/PrioritizedParameterNameDiscoverer.java index 086a0cc967..235f808733 100644 --- a/spring-core/src/main/java/org/springframework/core/PrioritizedParameterNameDiscoverer.java +++ b/spring-core/src/main/java/org/springframework/core/PrioritizedParameterNameDiscoverer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +22,11 @@ import java.util.LinkedList; import java.util.List; /** - * ParameterNameDiscoverer implementation that tries several ParameterNameDiscoverers - * in succession. Those added first in the {@code addDiscoverer} method have - * highest priority. If one returns {@code null}, the next will be tried. + * {@link ParameterNameDiscoverer} implementation that tries several discoverer + * delegates in succession. Those added first in the {@code addDiscoverer} method + * have highest priority. If one returns {@code null}, the next will be tried. * - *

The default behavior is always to return {@code null} - * if no discoverer matches. + *

The default behavior is to return {@code null} if no discoverer matches. * * @author Rod Johnson * @author Juergen Hoeller @@ -40,8 +39,8 @@ public class PrioritizedParameterNameDiscoverer implements ParameterNameDiscover /** - * Add a further ParameterNameDiscoverer to the list of discoverers - * that this PrioritizedParameterNameDiscoverer checks. + * Add a further {@link ParameterNameDiscoverer} delegate to the list of + * discoverers that this {@code PrioritizedParameterNameDiscoverer} checks. */ public void addDiscoverer(ParameterNameDiscoverer pnd) { this.parameterNameDiscoverers.add(pnd); diff --git a/spring-core/src/main/java/org/springframework/core/StandardReflectionParameterNameDiscoverer.java b/spring-core/src/main/java/org/springframework/core/StandardReflectionParameterNameDiscoverer.java index 6ce5f08166..c5882dbabf 100644 --- a/spring-core/src/main/java/org/springframework/core/StandardReflectionParameterNameDiscoverer.java +++ b/spring-core/src/main/java/org/springframework/core/StandardReflectionParameterNameDiscoverer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.springframework.lang.UsesJava8; * * @author Juergen Hoeller * @since 4.0 + * @see java.lang.reflect.Method#getParameters() * @see java.lang.reflect.Parameter#getName() */ @UsesJava8 diff --git a/spring-core/src/main/java/org/springframework/core/convert/converter/ConvertingComparator.java b/spring-core/src/main/java/org/springframework/core/convert/converter/ConvertingComparator.java index a6b5a8f41b..0f7baa3f81 100644 --- a/spring-core/src/main/java/org/springframework/core/convert/converter/ConvertingComparator.java +++ b/spring-core/src/main/java/org/springframework/core/convert/converter/ConvertingComparator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,9 +35,9 @@ import org.springframework.util.comparator.ComparableComparator; */ public class ConvertingComparator implements Comparator { - private Comparator comparator; + private final Comparator comparator; - private Converter converter; + private final Converter converter; /** diff --git a/spring-core/src/main/java/org/springframework/util/InvalidMimeTypeException.java b/spring-core/src/main/java/org/springframework/util/InvalidMimeTypeException.java index 7eff662059..983bdb03fa 100644 --- a/spring-core/src/main/java/org/springframework/util/InvalidMimeTypeException.java +++ b/spring-core/src/main/java/org/springframework/util/InvalidMimeTypeException.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ package org.springframework.util; @SuppressWarnings("serial") public class InvalidMimeTypeException extends IllegalArgumentException { - private String mimeType; + private final String mimeType; /** @@ -38,7 +38,6 @@ public class InvalidMimeTypeException extends IllegalArgumentException { public InvalidMimeTypeException(String mimeType, String message) { super("Invalid mime type \"" + mimeType + "\": " + message); this.mimeType = mimeType; - } diff --git a/spring-core/src/main/java/org/springframework/util/comparator/CompoundComparator.java b/spring-core/src/main/java/org/springframework/util/comparator/CompoundComparator.java index c04735c81d..582cf886e3 100644 --- a/spring-core/src/main/java/org/springframework/util/comparator/CompoundComparator.java +++ b/spring-core/src/main/java/org/springframework/util/comparator/CompoundComparator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ import org.springframework.util.Assert; * @author Juergen Hoeller * @since 1.2.2 */ -@SuppressWarnings({ "serial", "rawtypes" }) +@SuppressWarnings({"serial", "rawtypes"}) public class CompoundComparator implements Comparator, Serializable { private final List comparators; @@ -64,7 +64,7 @@ public class CompoundComparator implements Comparator, Serializable { Assert.notNull(comparators, "Comparators must not be null"); this.comparators = new ArrayList(comparators.length); for (Comparator comparator : comparators) { - this.addComparator(comparator); + addComparator(comparator); } } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java index d244dd2fc1..21ae508db8 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +22,12 @@ import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.listener.MessageListenerContainer; import org.springframework.jms.listener.endpoint.JmsActivationSpecConfig; import org.springframework.jms.listener.endpoint.JmsMessageEndpointManager; -import org.springframework.util.Assert; /** * Base model for a JMS listener endpoint * * @author Stephane Nicoll + * @author Juergen Hoeller * @since 4.1 * @see MethodJmsListenerEndpoint * @see SimpleJmsListenerEndpoint @@ -150,7 +150,9 @@ public abstract class AbstractJmsListenerEndpoint implements JmsListenerEndpoint private void setupMessageListener(MessageListenerContainer container) { MessageListener messageListener = createMessageListener(container); - Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener"); + if (messageListener == null) { + throw new IllegalStateException("Endpoint [" + this + "] must provide a non-null message listener"); + } container.setupMessageListener(messageListener); } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java index ad042cb8bf..52405f5ca8 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -118,8 +118,7 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; if (beanFactory instanceof ConfigurableBeanFactory) { - ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory; - this.mutex = cbf.getSingletonMutex(); + this.mutex = ((ConfigurableBeanFactory) beanFactory).getSingletonMutex(); } } diff --git a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java index b7379d2035..0b0bd8f280 100644 --- a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java +++ b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,8 +57,6 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint imple private StringValueResolver embeddedValueResolver; - private BeanFactory beanFactory; - /** * Set the actual bean instance to invoke this endpoint method on. @@ -122,11 +120,10 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint imple } /** - * Set the {@link BeanFactory} to use to resolve expressions (can be {@code null}). + * Set the {@link BeanFactory} to use to resolve expressions (may be {@code null}). */ @Override public void setBeanFactory(BeanFactory beanFactory) { - this.beanFactory = beanFactory; if (this.embeddedValueResolver == null && beanFactory instanceof ConfigurableBeanFactory) { this.embeddedValueResolver = new EmbeddedValueResolver((ConfigurableBeanFactory) beanFactory); } diff --git a/spring-jms/src/main/java/org/springframework/jms/core/BrowserCallback.java b/spring-jms/src/main/java/org/springframework/jms/core/BrowserCallback.java index 563503eb07..143858153a 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/BrowserCallback.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/BrowserCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,9 @@ import javax.jms.Session; /** * Callback for browsing the messages in a JMS queue. * - *

To be used with JmsTemplate's callback methods that take a BrowserCallback - * argument, often implemented as an anonymous inner class. + *

To be used with {@link JmsTemplate}'s callback methods that take a + * {@link BrowserCallback} argument, often implemented as an anonymous + * inner class or as a lambda expression. * * @author Juergen Hoeller * @since 2.5.1 @@ -34,11 +35,12 @@ import javax.jms.Session; public interface BrowserCallback { /** - * Perform operations on the given {@link javax.jms.Session} and {@link javax.jms.QueueBrowser}. - *

The message producer is not associated with any destination. + * Perform operations on the given {@link javax.jms.Session} and + * {@link javax.jms.QueueBrowser}. * @param session the JMS {@code Session} object to use * @param browser the JMS {@code QueueBrowser} object to use - * @return a result object from working with the {@code Session}, if any (can be {@code null}) + * @return a result object from working with the {@code Session}, if any + * (or {@code null} if none) * @throws javax.jms.JMSException if thrown by JMS API methods */ T doInJms(Session session, QueueBrowser browser) throws JMSException; diff --git a/spring-jms/src/main/java/org/springframework/jms/core/ProducerCallback.java b/spring-jms/src/main/java/org/springframework/jms/core/ProducerCallback.java index 6297b67983..87e8eb915a 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/ProducerCallback.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/ProducerCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,9 @@ import javax.jms.Session; /** * Callback for sending a message to a JMS destination. * - *

To be used with JmsTemplate's callback methods that take a ProducerCallback - * argument, often implemented as an anonymous inner class. + *

To be used with {@link JmsTemplate}'s callback methods that take a + * {@link ProducerCallback} argument, often implemented as an anonymous + * inner class or as a lambda expression. * *

The typical implementation will perform multiple operations on the * supplied JMS {@link Session} and {@link MessageProducer}. @@ -43,7 +44,8 @@ public interface ProducerCallback { * when specified in the JmsTemplate call. * @param session the JMS {@code Session} object to use * @param producer the JMS {@code MessageProducer} object to use - * @return a result object from working with the {@code Session}, if any (can be {@code null}) + * @return a result object from working with the {@code Session}, if any + * (or {@code null} if none) * @throws javax.jms.JMSException if thrown by JMS API methods */ T doInJms(Session session, MessageProducer producer) throws JMSException; diff --git a/spring-jms/src/main/java/org/springframework/jms/core/SessionCallback.java b/spring-jms/src/main/java/org/springframework/jms/core/SessionCallback.java index 77c57614e2..7e7b192612 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/SessionCallback.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/SessionCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,11 +20,10 @@ import javax.jms.JMSException; import javax.jms.Session; /** - * Callback for executing any number of operations on a provided - * {@link Session}. + * Callback for executing any number of operations on a provided {@link Session}. * - *

To be used with the {@link JmsTemplate#execute(SessionCallback)} - * method, often implemented as an anonymous inner class. + *

To be used with the {@link JmsTemplate#execute(SessionCallback)} method, + * often implemented as an anonymous inner class or as a lambda expression. * * @author Mark Pollack * @since 1.1 @@ -33,10 +32,11 @@ import javax.jms.Session; public interface SessionCallback { /** - * Execute any number of operations against the supplied JMS - * {@link Session}, possibly returning a result. + * Execute any number of operations against the supplied JMS {@link Session}, + * possibly returning a result. * @param session the JMS {@code Session} - * @return a result object from working with the {@code Session}, if any (so can be {@code null}) + * @return a result object from working with the {@code Session}, if any + * (or {@code null} if none) * @throws javax.jms.JMSException if thrown by JMS API methods */ T doInJms(Session session) throws JMSException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 1818540233..84e8a05520 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,21 +54,21 @@ import org.springframework.util.concurrent.ListenableFutureTask; * connection to the broker is opened and used exclusively for all messages from the * client that originated the CONNECT message. Messages from the same client are * identified through the session id message header. Reversely, when the STOMP broker - * sends messages back on the TCP connection, those messages are enriched with the session - * id of the client and sent back downstream through the {@link MessageChannel} provided - * to the constructor. + * sends messages back on the TCP connection, those messages are enriched with the + * session id of the client and sent back downstream through the {@link MessageChannel} + * provided to the constructor. * - *

This class also automatically opens a default "system" TCP connection to the message - * broker that is used for sending messages that originate from the server application (as - * opposed to from a client). Such messages are not associated with any client and - * therefore do not have a session id header. The "system" connection is effectively - * shared and cannot be used to receive messages. Several properties are provided to - * configure the "system" connection including: + *

This class also automatically opens a default "system" TCP connection to the + * message broker that is used for sending messages that originate from the server + * application (as opposed to from a client). Such messages are not associated with + * any client and therefore do not have a session id header. The "system" connection + * is effectively shared and cannot be used to receive messages. Several properties + * are provided to configure the "system" connection including: *

* * @author Rossen Stoyanchev @@ -79,23 +79,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public static final String SYSTEM_SESSION_ID = "_system_"; + // STOMP recommends error of margin for receiving heartbeats + private static final long HEARTBEAT_MULTIPLIER = 3; + + /** + * A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings + * we need. If we don't receive CONNECTED within a minute, the connection is closed proactively. + */ + private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; + private static final byte[] EMPTY_PAYLOAD = new byte[0]; private static final ListenableFutureTask EMPTY_TASK = new ListenableFutureTask(new VoidCallable()); - // STOMP recommends error of margin for receiving heartbeats - private static final long HEARTBEAT_MULTIPLIER = 3; - private static final Message HEARTBEAT_MESSAGE; - /** - * A heartbeat is setup once a CONNECTED frame is received which contains - * the heartbeat settings we need. If we don't receive CONNECTED within - * a minute, the connection is closed proactively. - */ - private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; - - static { EMPTY_TASK.run(); @@ -120,19 +118,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private long systemHeartbeatReceiveInterval = 10000; - private String virtualHost; - private final Map systemSubscriptions = new HashMap(4); + private String virtualHost; + private TcpOperations tcpClient; private MessageHeaderInitializer headerInitializer; + private final Stats stats = new Stats(); + private final Map connectionHandlers = new ConcurrentHashMap(); - private final Stats stats = new Stats(); - /** * Create a StompBrokerRelayMessageHandler instance with the given message channels @@ -178,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public int getRelayPort() { return this.relayPort; } - - /** - * Set the interval, in milliseconds, at which the "system" connection will, in the - * absence of any other data being sent, send a heartbeat to the STOMP broker. A value - * of zero will prevent heartbeats from being sent to the broker. - *

The default value is 10000. - *

See class-level documentation for more information on the "system" connection. - */ - public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { - this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; - } - - /** - * Return the interval, in milliseconds, at which the "system" connection will - * send heartbeats to the STOMP broker. - */ - public long getSystemHeartbeatSendInterval() { - return this.systemHeartbeatSendInterval; - } - - /** - * Set the maximum interval, in milliseconds, at which the "system" connection - * expects, in the absence of any other data, to receive a heartbeat from the STOMP - * broker. A value of zero will configure the connection to expect not to receive - * heartbeats from the broker. - *

The default value is 10000. - *

See class-level documentation for more information on the "system" connection. - */ - public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { - this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; - } - - /** - * Return the interval, in milliseconds, at which the "system" connection expects - * to receive heartbeats from the STOMP broker. - */ - public long getSystemHeartbeatReceiveInterval() { - return this.systemHeartbeatReceiveInterval; - } - /** * Set the login to use when creating connections to the STOMP broker on * behalf of connected clients. @@ -293,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.systemPasscode; } + + /** + * Set the interval, in milliseconds, at which the "system" connection will, in the + * absence of any other data being sent, send a heartbeat to the STOMP broker. A value + * of zero will prevent heartbeats from being sent to the broker. + *

The default value is 10000. + *

See class-level documentation for more information on the "system" connection. + */ + public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { + this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; + } + + /** + * Return the interval, in milliseconds, at which the "system" connection will + * send heartbeats to the STOMP broker. + */ + public long getSystemHeartbeatSendInterval() { + return this.systemHeartbeatSendInterval; + } + + /** + * Set the maximum interval, in milliseconds, at which the "system" connection + * expects, in the absence of any other data, to receive a heartbeat from the STOMP + * broker. A value of zero will configure the connection to expect not to receive + * heartbeats from the broker. + *

The default value is 10000. + *

See class-level documentation for more information on the "system" connection. + */ + public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { + this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; + } + + /** + * Return the interval, in milliseconds, at which the "system" connection expects + * to receive heartbeats from the STOMP broker. + */ + public long getSystemHeartbeatReceiveInterval() { + return this.systemHeartbeatReceiveInterval; + } + /** * Configure one more destinations to subscribe to on the shared "system" * connection along with MessageHandler's to handle received messages. @@ -342,21 +340,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Get the configured TCP client. Never {@code null} unless not configured + * Get the configured TCP client (never {@code null} unless not configured * invoked and this method is invoked before the handler is started and - * hence a default implementation initialized. + * hence a default implementation initialized). */ public TcpOperations getTcpClient() { return this.tcpClient; } - /** - * Return the current count of TCP connection to the broker. - */ - public int getConnectionCount() { - return this.connectionHandlers.size(); - } - /** * Configure a {@link MessageHeaderInitializer} to apply to the headers of all * messages created through the {@code StompBrokerRelayMessageHandler} that @@ -381,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.stats.toString(); } + /** + * Return the current count of TCP connection to the broker. + */ + public int getConnectionCount() { + return this.connectionHandlers.size(); + } + @Override protected void startInternal() { @@ -871,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private class SystemStompConnectionHandler extends StompConnectionHandler { public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { @@ -1009,10 +1008,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } public String toString() { - return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + + return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + (isBrokerAvailable() ? " (available)" : " (not available)") + ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + - this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"; + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java index 52ea8c7353..8e704b2195 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,7 +91,6 @@ public abstract class AbstractWebSocketSession implements NativeWebSocketSess @Override public final void sendMessage(WebSocketMessage message) throws IOException { - checkNativeSessionInitialized(); if (logger.isTraceEnabled()) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java index c554b7d166..e32ff263c8 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.sockjs.frame.SockJsFrame; -import org.springframework.web.socket.sockjs.frame.SockJsFrameType; import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; /** @@ -44,20 +43,19 @@ import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; * Sub-classes implement actual send as well as disconnect logic. * * @author Rossen Stoyanchev + * @author Juergen Hoeller * @since 4.1 */ public abstract class AbstractClientSockJsSession implements WebSocketSession { protected final Log logger = LogFactory.getLog(getClass()); - private final TransportRequest request; private final WebSocketHandler webSocketHandler; private final SettableListenableFuture connectFuture; - private final Map attributes = new ConcurrentHashMap(); private volatile State state = State.NEW; @@ -127,25 +125,31 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { @Override public boolean isOpen() { - return State.OPEN.equals(this.state); + return (this.state == State.OPEN); } public boolean isDisconnected() { - return (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)); + return (this.state == State.CLOSING || this.state == State.CLOSED); } @Override public final void sendMessage(WebSocketMessage message) throws IOException { - Assert.state(State.OPEN.equals(this.state), this + " is not open, current state=" + this.state); - Assert.isInstanceOf(TextMessage.class, message, this + " supports text messages only."); - String payload = ((TextMessage) message).getPayload(); - payload = getMessageCodec().encode(new String[] { payload }); - payload = payload.substring(1); // the client-side doesn't need message framing (letter "a") - message = new TextMessage(payload); - if (logger.isTraceEnabled()) { - logger.trace("Sending message " + message + " in " + this); + if (!(message instanceof TextMessage)) { + throw new IllegalArgumentException(this + " supports text messages only."); } - sendInternal((TextMessage) message); + if (this.state != State.OPEN) { + throw new IllegalStateException(this + " is not open: current state " + this.state); + } + + String payload = ((TextMessage) message).getPayload(); + payload = getMessageCodec().encode(payload); + payload = payload.substring(1); // the client-side doesn't need message framing (letter "a") + + TextMessage messageToSend = new TextMessage(payload); + if (logger.isTraceEnabled()) { + logger.trace("Sending message " + messageToSend + " in " + this); + } + sendInternal(messageToSend); } protected abstract void sendInternal(TextMessage textMessage) throws IOException; @@ -173,10 +177,13 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { logger.warn("Ignoring close since connect() was never invoked"); return; } - if (State.CLOSING.equals(this.state) || State.CLOSED.equals(this.state)) { - logger.debug("Ignoring close (already closing or closed), current state=" + this.state); + if (isDisconnected()) { + if (logger.isDebugEnabled()) { + logger.debug("Ignoring close (already closing or closed): current state " + this.state); + } return; } + this.state = State.CLOSING; this.closeStatus = status; try { @@ -193,23 +200,20 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { public void handleFrame(String payload) { SockJsFrame frame = new SockJsFrame(payload); - if (SockJsFrameType.OPEN.equals(frame.getType())) { - handleOpenFrame(); - } - else if (SockJsFrameType.MESSAGE.equals(frame.getType())) { - handleMessageFrame(frame); - } - else if (SockJsFrameType.CLOSE.equals(frame.getType())) { - handleCloseFrame(frame); - } - else if (SockJsFrameType.HEARTBEAT.equals(frame.getType())) { - if (logger.isTraceEnabled()) { - logger.trace("Received heartbeat in " + this); - } - } - else { - // should never happen - throw new IllegalStateException("Unknown SockJS frame type " + frame + " in " + this); + switch (frame.getType()) { + case OPEN: + handleOpenFrame(); + break; + case HEARTBEAT: + if (logger.isTraceEnabled()) { + logger.trace("Received heartbeat in " + this); + } + break; + case MESSAGE: + handleMessageFrame(frame); + break; + case CLOSE: + handleCloseFrame(frame); } } @@ -217,7 +221,7 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { if (logger.isDebugEnabled()) { logger.debug("Processing SockJS open frame in " + this); } - if (State.NEW.equals(state)) { + if (this.state == State.NEW) { this.state = State.OPEN; try { this.webSocketHandler.afterConnectionEstablished(this); @@ -225,16 +229,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { } catch (Throwable ex) { if (logger.isErrorEnabled()) { - Class type = this.webSocketHandler.getClass(); - logger.error(type + ".afterConnectionEstablished threw exception in " + this, ex); + logger.error("WebSocketHandler.afterConnectionEstablished threw exception in " + this, ex); } } } else { if (logger.isDebugEnabled()) { - logger.debug("Open frame received in " + getId() + " but we're not" + - "connecting (current state=" + this.state + "). The server might " + - "have been restarted and lost track of the session."); + logger.debug("Open frame received in " + getId() + " but we're not connecting (current state " + + this.state + "). The server might have been restarted and lost track of the session."); } closeInternal(new CloseStatus(1006, "Server lost session")); } @@ -243,10 +245,11 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { private void handleMessageFrame(SockJsFrame frame) { if (!isOpen()) { if (logger.isErrorEnabled()) { - logger.error("Ignoring received message due to state=" + this.state + " in " + this); + logger.error("Ignoring received message due to state " + this.state + " in " + this); } return; } + String[] messages; try { messages = getMessageCodec().decode(frame.getFrameData()); @@ -258,18 +261,18 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { closeInternal(CloseStatus.BAD_DATA); return; } + if (logger.isTraceEnabled()) { logger.trace("Processing SockJS message frame " + frame.getContent() + " in " + this); } for (String message : messages) { - try { - if (isOpen()) { + if (isOpen()) { + try { this.webSocketHandler.handleMessage(this, new TextMessage(message)); } - } - catch (Throwable ex) { - Class type = this.webSocketHandler.getClass(); - logger.error(type + ".handleMessage threw an exception on " + frame + " in " + this, ex); + catch (Throwable ex) { + logger.error("WebSocketHandler.handleMessage threw an exception on " + frame + " in " + this, ex); + } } } } @@ -300,18 +303,14 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { } this.webSocketHandler.handleTransportError(this, error); } - catch (Exception ex) { - Class type = this.webSocketHandler.getClass(); - if (logger.isErrorEnabled()) { - logger.error(type + ".handleTransportError threw an exception", ex); - } + catch (Throwable ex) { + logger.error("WebSocketHandler.handleTransportError threw an exception", ex); } } public void afterTransportClosed(CloseStatus closeStatus) { this.closeStatus = (this.closeStatus != null ? this.closeStatus : closeStatus); Assert.state(this.closeStatus != null, "CloseStatus not available"); - if (logger.isDebugEnabled()) { logger.debug("Transport closed with " + this.closeStatus + " in " + this); } @@ -320,11 +319,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { try { this.webSocketHandler.afterConnectionClosed(this, this.closeStatus); } - catch (Exception ex) { - if (logger.isErrorEnabled()) { - Class type = this.webSocketHandler.getClass(); - logger.error(type + ".afterConnectionClosed threw an exception", ex); - } + catch (Throwable ex) { + logger.error("WebSocketHandler.afterConnectionClosed threw an exception", ex); } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/frame/SockJsFrame.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/frame/SockJsFrame.java index 50ba29f3c3..c0c574577d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/frame/SockJsFrame.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/frame/SockJsFrame.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,8 @@ public class SockJsFrame { private static final SockJsFrame CLOSE_GO_AWAY_FRAME = closeFrame(3000, "Go away!"); - private static final SockJsFrame CLOSE_ANOTHER_CONNECTION_OPEN_FRAME = closeFrame(2010, "Another connection still open"); + private static final SockJsFrame CLOSE_ANOTHER_CONNECTION_OPEN_FRAME = + closeFrame(2010, "Another connection still open"); private final SockJsFrameType type; @@ -46,10 +47,10 @@ public class SockJsFrame { /** * Create a new instance frame with the given frame content. - * @param content the content, must be a non-empty and represent a valid SockJS frame + * @param content the content (must be a non-empty and represent a valid SockJS frame) */ public SockJsFrame(String content) { - Assert.hasText(content); + Assert.hasText(content, "Content must not be empty"); if ("o".equals(content)) { this.type = SockJsFrameType.OPEN; this.content = content; @@ -71,10 +72,74 @@ public class SockJsFrame { this.content = (content.length() > 1 ? content : "c[]"); } else { - throw new IllegalArgumentException("Unexpected SockJS frame type in content=\"" + content + "\""); + throw new IllegalArgumentException("Unexpected SockJS frame type in content \"" + content + "\""); } } + + /** + * Return the SockJS frame type. + */ + public SockJsFrameType getType() { + return this.type; + } + + /** + * Return the SockJS frame content (never {@code null}). + */ + public String getContent() { + return this.content; + } + + /** + * Return the SockJS frame content as a byte array. + */ + public byte[] getContentBytes() { + return this.content.getBytes(CHARSET); + } + + /** + * Return data contained in a SockJS "message" and "close" frames. Otherwise + * for SockJS "open" and "close" frames, which do not contain data, return + * {@code null}. + */ + public String getFrameData() { + if (getType() == SockJsFrameType.OPEN || getType() == SockJsFrameType.HEARTBEAT) { + return null; + } + else { + return getContent().substring(1); + } + } + + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof SockJsFrame)) { + return false; + } + SockJsFrame otherFrame = (SockJsFrame) other; + return (this.type.equals(otherFrame.type) && this.content.equals(otherFrame.content)); + } + + @Override + public int hashCode() { + return this.content.hashCode(); + } + + @Override + public String toString() { + String result = this.content; + if (result.length() > 80) { + result = result.substring(0, 80) + "...(truncated)"; + } + return "SockJsFrame content='" + result.replace("\n", "\\n").replace("\r", "\\r") + "'"; + } + + public static SockJsFrame openFrame() { return OPEN_FRAME; } @@ -100,66 +165,4 @@ public class SockJsFrame { return new SockJsFrame("c[" + code + ",\"" + reason + "\"]"); } - - /** - * Return the SockJS frame type. - */ - public SockJsFrameType getType() { - return this.type; - } - - /** - * Return the SockJS frame content, never {@code null}. - */ - public String getContent() { - return this.content; - } - - /** - * Return the SockJS frame content as a byte array. - */ - public byte[] getContentBytes() { - return this.content.getBytes(CHARSET); - } - - /** - * Return data contained in a SockJS "message" and "close" frames. Otherwise - * for SockJS "open" and "close" frames, which do not contain data, return - * {@code null}. - */ - public String getFrameData() { - if (SockJsFrameType.OPEN == getType() || SockJsFrameType.HEARTBEAT == getType()) { - return null; - } - else { - return getContent().substring(1); - } - } - - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof SockJsFrame)) { - return false; - } - return (this.type.equals(((SockJsFrame) other).type) && this.content.equals(((SockJsFrame) other).content)); - } - - @Override - public int hashCode() { - return this.content.hashCode(); - } - - @Override - public String toString() { - String result = this.content; - if (result.length() > 80) { - result = result.substring(0, 80) + "...(truncated)"; - } - return "SockJsFrame content='" + result.replace("\n", "\\n").replace("\r", "\\r") + "'"; - } - }