Polishing

(cherry picked from commit f095aa2)
This commit is contained in:
Juergen Hoeller
2017-01-23 21:28:40 +01:00
parent bddcc669b3
commit fcfacd9f83
17 changed files with 259 additions and 262 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,21 +54,21 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* connection to the broker is opened and used exclusively for all messages from the
* client that originated the CONNECT message. Messages from the same client are
* identified through the session id message header. Reversely, when the STOMP broker
* sends messages back on the TCP connection, those messages are enriched with the session
* id of the client and sent back downstream through the {@link MessageChannel} provided
* to the constructor.
* sends messages back on the TCP connection, those messages are enriched with the
* session id of the client and sent back downstream through the {@link MessageChannel}
* provided to the constructor.
*
* <p>This class also automatically opens a default "system" TCP connection to the message
* broker that is used for sending messages that originate from the server application (as
* opposed to from a client). Such messages are not associated with any client and
* therefore do not have a session id header. The "system" connection is effectively
* shared and cannot be used to receive messages. Several properties are provided to
* configure the "system" connection including:
* <p>This class also automatically opens a default "system" TCP connection to the
* message broker that is used for sending messages that originate from the server
* application (as opposed to from a client). Such messages are not associated with
* any client and therefore do not have a session id header. The "system" connection
* is effectively shared and cannot be used to receive messages. Several properties
* are provided to configure the "system" connection including:
* <ul>
* <li>{@link #setSystemLogin(String)}</li>
* <li>{@link #setSystemPasscode(String)}</li>
* <li>{@link #setSystemHeartbeatSendInterval(long)}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li>
* <li>{@link #setSystemLogin}</li>
* <li>{@link #setSystemPasscode}</li>
* <li>{@link #setSystemHeartbeatSendInterval}</li>
* <li>{@link #setSystemHeartbeatReceiveInterval}</li>
* </ul>
*
* @author Rossen Stoyanchev
@@ -79,23 +79,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public static final String SYSTEM_SESSION_ID = "_system_";
// STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3;
/**
* A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
* we need. If we don't receive CONNECTED within a minute, the connection is closed proactively.
*/
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<Void>(new VoidCallable());
// STOMP recommends error of margin for receiving heartbeats
private static final long HEARTBEAT_MULTIPLIER = 3;
private static final Message<byte[]> HEARTBEAT_MESSAGE;
/**
* A heartbeat is setup once a CONNECTED frame is received which contains
* the heartbeat settings we need. If we don't receive CONNECTED within
* a minute, the connection is closed proactively.
*/
private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000;
static {
EMPTY_TASK.run();
@@ -120,19 +118,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private long systemHeartbeatReceiveInterval = 10000;
private String virtualHost;
private final Map<String, MessageHandler> systemSubscriptions = new HashMap<String, MessageHandler>(4);
private String virtualHost;
private TcpOperations<byte[]> tcpClient;
private MessageHeaderInitializer headerInitializer;
private final Stats stats = new Stats();
private final Map<String, StompConnectionHandler> connectionHandlers =
new ConcurrentHashMap<String, StompConnectionHandler>();
private final Stats stats = new Stats();
/**
* Create a StompBrokerRelayMessageHandler instance with the given message channels
@@ -178,46 +176,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public int getRelayPort() {
return this.relayPort;
}
/**
* Set the interval, in milliseconds, at which the "system" connection will, in the
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* of zero will prevent heartbeats from being sent to the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
return this.systemHeartbeatSendInterval;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" connection
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the connection to expect not to receive
* heartbeats from the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
return this.systemHeartbeatReceiveInterval;
}
/**
* Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients.
@@ -293,6 +251,46 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.systemPasscode;
}
/**
* Set the interval, in milliseconds, at which the "system" connection will, in the
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
* of zero will prevent heartbeats from being sent to the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection will
* send heartbeats to the STOMP broker.
*/
public long getSystemHeartbeatSendInterval() {
return this.systemHeartbeatSendInterval;
}
/**
* Set the maximum interval, in milliseconds, at which the "system" connection
* expects, in the absence of any other data, to receive a heartbeat from the STOMP
* broker. A value of zero will configure the connection to expect not to receive
* heartbeats from the broker.
* <p>The default value is 10000.
* <p>See class-level documentation for more information on the "system" connection.
*/
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
}
/**
* Return the interval, in milliseconds, at which the "system" connection expects
* to receive heartbeats from the STOMP broker.
*/
public long getSystemHeartbeatReceiveInterval() {
return this.systemHeartbeatReceiveInterval;
}
/**
* Configure one more destinations to subscribe to on the shared "system"
* connection along with MessageHandler's to handle received messages.
@@ -342,21 +340,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
/**
* Get the configured TCP client. Never {@code null} unless not configured
* Get the configured TCP client (never {@code null} unless not configured
* invoked and this method is invoked before the handler is started and
* hence a default implementation initialized.
* hence a default implementation initialized).
*/
public TcpOperations<byte[]> getTcpClient() {
return this.tcpClient;
}
/**
* Return the current count of TCP connection to the broker.
*/
public int getConnectionCount() {
return this.connectionHandlers.size();
}
/**
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
* messages created through the {@code StompBrokerRelayMessageHandler} that
@@ -381,6 +372,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.stats.toString();
}
/**
* Return the current count of TCP connection to the broker.
*/
public int getConnectionCount() {
return this.connectionHandlers.size();
}
@Override
protected void startInternal() {
@@ -871,6 +869,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private class SystemStompConnectionHandler extends StompConnectionHandler {
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
@@ -1009,10 +1008,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
public String toString() {
return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
(isBrokerAvailable() ? " (available)" : " (not available)") +
", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")");
}
}