Polishing

This commit is contained in:
Juergen Hoeller
2014-09-04 00:55:38 +02:00
parent 8543a5548e
commit 97bd0ccfec
10 changed files with 319 additions and 445 deletions

View File

@@ -41,8 +41,10 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;
/**
* A {@link org.springframework.messaging.MessageHandler} that handles messages by forwarding them to a STOMP broker.
* For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
* A {@link org.springframework.messaging.MessageHandler} that handles messages by
* forwarding them to a STOMP broker.
*
* <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
* connection to the broker is opened and used exclusively for all messages from the
* client that originated the CONNECT message. Messages from the same client are
* identified through the session id message header. Reversely, when the STOMP broker
@@ -57,10 +59,10 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* shared and cannot be used to receive messages. Several properties are provided to
* configure the "system" connection including:
* <ul>
* <li>{@link #setSystemLogin(String)}</li>
* <li>{@link #setSystemPasscode(String)}</li>
* <li>{@link #setSystemHeartbeatSendInterval(long)}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li>
* <li>{@link #setSystemLogin(String)}</li>
* <li>{@link #setSystemPasscode(String)}</li>
* <li>{@link #setSystemHeartbeatSendInterval(long)}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li>
* </ul>
*
* @author Rossen Stoyanchev
@@ -84,10 +86,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static final Message<byte[]> HEARTBEAT_MESSAGE;
static {
EMPTY_TASK.run();
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).setHeaders(headers).build();
EMPTY_TASK.run();
}
@@ -124,7 +127,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Create a StompBrokerRelayMessageHandler instance with the given message channels
* and destination prefixes.
*
* @param clientInChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutChannel the channel for sending messages to clients (e.g. WebSocket clients)
* @param brokerChannel the channel for the application to send messages to the broker
@@ -135,11 +137,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
super(destinationPrefixes);
Assert.notNull(clientInChannel, "'clientInChannel' must not be null");
Assert.notNull(clientOutChannel, "'clientOutChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
this.clientInboundChannel = clientInChannel;
this.clientOutboundChannel = clientOutChannel;
this.brokerChannel = brokerChannel;
@@ -155,7 +155,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the STOMP message broker host.
* Return the STOMP message broker host.
*/
public String getRelayHost() {
return this.relayHost;
@@ -169,7 +169,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the STOMP message broker port.
* Return the STOMP message broker port.
*/
public int getRelayPort() {
return this.relayPort;
@@ -187,7 +187,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return The interval, in milliseconds, at which the "system" connection will
* Return the interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
@@ -207,7 +207,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return The interval, in milliseconds, at which the "system" connection expects
* Return the interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
@@ -217,8 +217,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients.
* <p>
* By default this is set to "guest".
* <p>By default this is set to "guest".
* @see #setSystemLogin(String)
*/
public void setClientLogin(String clientLogin) {
@@ -227,7 +226,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the configured login to use for connections to the STOMP broker
* Return the configured login to use for connections to the STOMP broker
* on behalf of connected clients.
* @see #getSystemLogin()
*/
@@ -236,11 +235,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* Set the clientPasscode to use to create connections to the STOMP broker on
* Set the client passcode to use to create connections to the STOMP broker on
* behalf of connected clients.
* <p>
* By default this is set to "guest".
* @see #setSystemPasscode(String)
* <p>By default this is set to "guest".
* @see #setSystemPasscode
*/
public void setClientPasscode(String clientPasscode) {
Assert.hasText(clientPasscode, "clientPasscode must not be empty");
@@ -248,7 +246,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the configured passocde to use for connections to the STOMP broker on
* Return the configured passcode to use for connections to the STOMP broker on
* behalf of connected clients.
* @see #getSystemPasscode()
*/
@@ -260,8 +258,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* Set the login for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method).
* <p>
* By default this is set to "guest".
* <p>By default this is set to "guest".
*/
public void setSystemLogin(String systemLogin) {
Assert.hasText(systemLogin, "systemLogin must not be empty");
@@ -269,7 +266,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the login used for the shared "system" connection to the STOMP broker
* Return the login used for the shared "system" connection to the STOMP broker.
*/
public String getSystemLogin() {
return this.systemLogin;
@@ -279,15 +276,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* Set the passcode for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method).
* <p>
* By default this is set to "guest".
* <p>By default this is set to "guest".
*/
public void setSystemPasscode(String systemPasscode) {
this.systemPasscode = systemPasscode;
}
/**
* @return the passcode used for the shared "system" connection to the STOMP broker
* Return the passcode used for the shared "system" connection to the STOMP broker.
*/
public String getSystemPasscode() {
return this.systemPasscode;
@@ -306,7 +302,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* @return the configured virtual host value.
* Return the configured virtual host value.
*/
public String getVirtualHost() {
return this.virtualHost;
@@ -367,7 +363,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void stopInternal() {
publishBrokerUnavailableEvent();
this.clientInboundChannel.unsubscribe(this);
@@ -376,19 +371,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
try {
this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
}
catch (Throwable t) {
logger.error("Error while shutting down TCP client", t);
catch (Throwable ex) {
logger.error("Error in shutdown of TCP client", ex);
}
}
@Override
protected void handleMessageInternal(Message<?> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
String sessionId = headers.getSessionId();
if (!isBrokerAvailable()) {
if (sessionId == null || sessionId == SystemStompConnectionHandler.SESSION_ID) {
if (sessionId == null || sessionId.equals(SystemStompConnectionHandler.SESSION_ID)) {
throw new MessageDeliveryException("Message broker is not active.");
}
if (SimpMessageType.CONNECT.equals(headers.getMessageType()) && logger.isErrorEnabled()) {
@@ -418,7 +412,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return;
}
if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) {
if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message to destination=" + destination);
}
@@ -482,15 +476,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this(sessionId, connectHeaders, true);
}
private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders,
boolean isRemoteClientSession) {
Assert.notNull(sessionId, "SessionId must not be null");
Assert.notNull(connectHeaders, "ConnectHeaders must not be null");
private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
Assert.notNull(sessionId, "'sessionId' must not be null");
Assert.notNull(connectHeaders, "'connectHeaders' must not be null");
this.sessionId = sessionId;
this.connectHeaders = connectHeaders;
this.isRemoteClientSession = isRemoteClientSession;
this.isRemoteClientSession = isClientSession;
}
public String getSessionId() {
@@ -513,7 +504,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Invoked when any TCP connectivity issue is detected, i.e. failure to establish
* the TCP connection, failure to send a message, missed heartbeat.
* the TCP connection, failure to send a message, missed heartbeat, etc.
*/
protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
if (logger.isErrorEnabled()) {
@@ -526,9 +517,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
try {
clearConnection();
}
catch (Throwable t) {
catch (Throwable ex2) {
if (logger.isErrorEnabled()) {
logger.error("Failed to close connection: " + t.getMessage());
logger.error("Failed to close connection: " + ex2.getMessage());
}
}
}
@@ -552,7 +543,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void handleMessage(Message<byte[]> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
headers.setSessionId(this.sessionId);
@@ -584,7 +574,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
// Remote clients do their own heartbeat management
if (this.isRemoteClientSession) {
return;
@@ -592,11 +581,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
long serverSendInterval = connectedHeaders.getHeartbeat()[0];
long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
if ((clientSendInterval > 0) && (serverReceiveInterval > 0)) {
if (clientSendInterval > 0 && serverReceiveInterval > 0) {
long interval = Math.max(clientSendInterval, serverReceiveInterval);
this.tcpConnection.onWriteInactivity(new Runnable() {
@Override
@@ -605,10 +593,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (conn != null) {
conn.send(HEARTBEAT_MESSAGE).addCallback(
new ListenableFutureCallback<Void>() {
public void onFailure(Throwable t) {
handleTcpConnectionFailure("Failed to send heartbeat", t);
public void onSuccess(Void result) {
}
public void onFailure(Throwable ex) {
handleTcpConnectionFailure("Failed to send heartbeat", ex);
}
public void onSuccess(Void result) {}
});
}
}
@@ -620,8 +609,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.tcpConnection.onReadInactivity(new Runnable() {
@Override
public void run() {
handleTcpConnectionFailure("No hearbeat from broker for more than " +
interval + "ms, closing connection", null);
handleTcpConnectionFailure("No heartbeat from broker for more than " + interval +
"ms, closing connection", null);
}
}, interval);
}
@@ -652,17 +641,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.tcpConnection = null;
clearConnection();
}
catch (Throwable t) {
if (logger.isErrorEnabled()) {
// Ignore
}
catch (Throwable ex) {
// Shouldn't happen with connection reset beforehand
}
}
}
/**
* Forward the given message to the STOMP broker.
*
* <p>The method checks whether we have an active TCP connection and have
* received the STOMP CONNECTED frame. For client messages this should be
* false only if we lose the TCP connection around the same time when a
@@ -671,7 +657,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* the "system" connection an exception is raised so that components sending
* the message have a chance to handle it -- by default the broker message
* channel is synchronous.
*
* <p>Note that if messages arrive concurrently around the same time a TCP
* connection is lost, there is a brief period of time before the connection
* is reset when one or more messages may sneak through and an attempt made
@@ -679,13 +664,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* method simply lets them try and fail. For client sessions that may
* result in an additional STOMP ERROR frame(s) being sent downstream but
* code handling that downstream should be idempotent in such cases.
*
* @param message the message to send, never {@code null}
* @param message the message to send (never {@code null})
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(final Message<?> message) {
TcpConnection<byte[]> conn = this.tcpConnection;
if (!this.isStompConnected) {
@@ -725,12 +708,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Throwable ex) {
if (tcpConnection == null) {
// already reset
}
else {
handleTcpConnectionFailure("Failed to send message " + message, t);
handleTcpConnectionFailure("Failed to send message " + message, ex);
}
}
});
@@ -742,13 +725,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* Close the TCP connection to the broker and release the connection reference,
* Any exception arising from closing the connection is propagated. The caller
* must handle and log the exception accordingly.
*
* <p>If the connection belongs to a client session, the connection handler
* for the session (basically the current instance) is also released from the
* {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler}.
*/
public void clearConnection() {
if (this.isRemoteClientSession) {
if (logger.isDebugEnabled()) {
logger.debug("Removing session '" + sessionId + "' (total remaining=" +
@@ -772,11 +753,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private class SystemStompConnectionHandler extends StompConnectionHandler {
public static final String SESSION_ID = "stompRelaySystemSessionId";
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
super(SESSION_ID, connectHeaders, false);
}
@@ -788,8 +769,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
@Override
protected void handleTcpConnectionFailure(String errorMessage, Throwable t) {
super.handleTcpConnectionFailure(errorMessage, t);
protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
super.handleTcpConnectionFailure(errorMessage, ex);
publishBrokerUnavailableEvent();
}
@@ -806,12 +787,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
future.get();
return future;
}
catch (Throwable t) {
throw new MessageDeliveryException(message, t);
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
}
}
}
private static class Reactor11TcpClientFactory {
public TcpOperations<byte[]> create(String host, int port) {
@@ -820,6 +802,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private static class Reactor10TcpClientFactory {
public TcpOperations<byte[]> create(String host, int port) {
@@ -828,6 +811,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private static class VoidCallable implements Callable<Void> {
@Override