Fine tune STOMP and WebSocket related logging

Optimize logging with tracking the opening and closing of WebSocket
sessions and STOMP broker connections in mind.

While the volume of messages makes it impractical to log every message
at anything higher than TRACE, the opening and closing of connections
is more manageable and can be logged at INFO. This makes it possible to
drop to INFO in production and get useful information without getting
too much in a short period of time.

The logging is also optimized to avoid providing the same information
from multiple places since messages pass through multiple layers.

Issue: SPR-11884
This commit is contained in:
Rossen Stoyanchev
2014-06-27 03:39:47 -04:00
parent b2f6445058
commit 113fd1180a
34 changed files with 480 additions and 490 deletions

View File

@@ -325,20 +325,16 @@ public abstract class AbstractMethodMessageHandler<T>
String destination = getDestination(message);
if (destination == null) {
logger.trace("Ignoring message, no destination");
return;
}
String lookupDestination = getLookupDestination(destination);
if (lookupDestination == null) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message to destination=" + destination);
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Handling message, lookupDestination=" + lookupDestination);
logger.debug("Handling message to " + destination);
}
MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message);
@@ -397,7 +393,8 @@ public abstract class AbstractMethodMessageHandler<T>
Collections.sort(matches, comparator);
if (logger.isTraceEnabled()) {
logger.trace("Found " + matches.size() + " matching mapping(s) for [" + lookupDestination + "] : " + matches);
logger.trace("Found " + matches.size() + " matching mapping(s) for [" +
lookupDestination + "] : " + matches);
}
Match bestMatch = matches.get(0);
@@ -507,7 +504,7 @@ public abstract class AbstractMethodMessageHandler<T>
protected void handleNoMatch(Set<T> ts, String lookupDestination, Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("No matching method found");
logger.debug("No matching method found.");
}
}

View File

@@ -62,6 +62,22 @@ public class SimpAttributes {
this.attributes = attributes;
}
/**
* Extract the SiMP session attributes from the given message, wrap them in
* a {@link SimpAttributes} instance.
* @param message the message to extract session attributes from
*/
public static SimpAttributes fromMessage(Message<?> message) {
Assert.notNull(message);
MessageHeaders headers = message.getHeaders();
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
Map<String, Object> sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers);
if (sessionId == null || sessionAttributes == null) {
throw new IllegalStateException(
"Message does not contain SiMP session id or attributes: " + message);
}
return new SimpAttributes(sessionId, sessionAttributes);
}
/**
* Return the value for the attribute of the given name, if any.
@@ -175,22 +191,4 @@ public class SimpAttributes {
}
}
/**
* Extract the SiMP session attributes from the given message, wrap them in
* a {@link SimpAttributes} instance.
* @param message the message to extract session attributes from
*/
public static SimpAttributes fromMessage(Message<?> message) {
Assert.notNull(message);
MessageHeaders headers = message.getHeaders();
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
Map<String, Object> sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers);
if (sessionId == null || sessionAttributes == null) {
throw new IllegalStateException(
"Message does not contain SiMP session id or attributes: " + message);
}
return new SimpAttributes(sessionId, sessionAttributes);
}
}

View File

@@ -133,9 +133,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception {
if (returnValue == null) {
return;
}

View File

@@ -187,7 +187,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
* The configured Validator instance
*/
public Validator getValidator() {
return validator;
return this.validator;
}
/**
@@ -315,25 +315,23 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
@Override
protected SimpMessageMappingInfo getMappingForMethod(Method method, Class<?> handlerType) {
MessageMapping typeAnnot = AnnotationUtils.findAnnotation(handlerType, MessageMapping.class);
MessageMapping typeAnnotation = AnnotationUtils.findAnnotation(handlerType, MessageMapping.class);
MessageMapping messageAnnot = AnnotationUtils.findAnnotation(method, MessageMapping.class);
if (messageAnnot != null) {
SimpMessageMappingInfo result = createMessageMappingCondition(messageAnnot);
if (typeAnnot != null) {
result = createMessageMappingCondition(typeAnnot).combine(result);
if (typeAnnotation != null) {
result = createMessageMappingCondition(typeAnnotation).combine(result);
}
return result;
}
SubscribeMapping subsribeAnnot = AnnotationUtils.findAnnotation(method, SubscribeMapping.class);
if (subsribeAnnot != null) {
SimpMessageMappingInfo result = createSubscribeCondition(subsribeAnnot);
if (typeAnnot != null) {
result = createMessageMappingCondition(typeAnnot).combine(result);
SubscribeMapping subsribeAnnotation = AnnotationUtils.findAnnotation(method, SubscribeMapping.class);
if (subsribeAnnotation != null) {
SimpMessageMappingInfo result = createSubscribeCondition(subsribeAnnotation);
if (typeAnnotation != null) {
result = createMessageMappingCondition(typeAnnotation).combine(result);
}
return result;
}
return null;
}

View File

@@ -43,12 +43,17 @@ public abstract class AbstractBrokerMessageHandler
protected final Log logger = LogFactory.getLog(getClass());
private final Collection<String> destinationPrefixes;
private ApplicationEventPublisher eventPublisher;
private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this);
private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this);
private boolean autoStartup = true;
private Object lifecycleMonitor = new Object();
@@ -128,12 +133,12 @@ public abstract class AbstractBrokerMessageHandler
public final void start() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Starting");
logger.debug("Starting...");
}
startInternal();
this.running = true;
if (logger.isDebugEnabled()) {
logger.debug("Started");
logger.debug("Started.");
}
}
}
@@ -145,12 +150,12 @@ public abstract class AbstractBrokerMessageHandler
public final void stop() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Stopping");
logger.debug("Stopping...");
}
stopInternal();
this.running = false;
if (logger.isDebugEnabled()) {
logger.debug("Stopped");
logger.debug("Stopped.");
}
}
}
@@ -170,7 +175,7 @@ public abstract class AbstractBrokerMessageHandler
public final void handleMessage(Message<?> message) {
if (!this.running) {
if (logger.isTraceEnabled()) {
logger.trace("Message broker is not running. Ignoring message=" + message);
logger.trace(this + " not running yet. Ignoring " + message);
}
return;
}
@@ -194,20 +199,20 @@ public abstract class AbstractBrokerMessageHandler
protected void publishBrokerAvailableEvent() {
boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
if (this.eventPublisher != null && shouldPublish) {
if (logger.isDebugEnabled()) {
logger.debug("Publishing BrokerAvailabilityEvent (available)");
if (logger.isInfoEnabled()) {
logger.info("Publishing " + this.availableEvent);
}
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this));
this.eventPublisher.publishEvent(this.availableEvent);
}
}
protected void publishBrokerUnavailableEvent() {
boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false);
if (this.eventPublisher != null && shouldPublish) {
if (logger.isDebugEnabled()) {
logger.debug("Publishing BrokerAvailabilityEvent (unavailable)");
if (logger.isInfoEnabled()) {
logger.info("Publishing " + this.notAvailableEvent);
}
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this));
this.eventPublisher.publishEvent(this.notAvailableEvent);
}
}

View File

@@ -22,6 +22,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
/**
@@ -39,32 +40,24 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
@Override
public final void registerSubscription(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers);
if (!SimpMessageType.SUBSCRIBE.equals(type)) {
logger.error("Expected SUBSCRIBE message: " + message);
return;
}
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
Assert.isTrue(SimpMessageType.SUBSCRIBE.equals(messageType), "Expected SUBSCRIBE: " + message);
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
if (sessionId == null) {
logger.error("Ignoring subscription. No sessionId in message: " + message);
logger.error("No sessionId in " + message);
return;
}
String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
if (subscriptionId == null) {
logger.error("Ignoring subscription. No subscriptionId in message: " + message);
logger.error("No subscriptionId in " + message);
return;
}
String destination = SimpMessageHeaderAccessor.getDestination(headers);
if (destination == null) {
logger.error("Ignoring destination. No destination in message: " + message);
logger.error("No destination in " + message);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Adding subscription id=" + subscriptionId + ", destination=" + destination);
}
addSubscriptionInternal(sessionId, subscriptionId, destination, message);
}
@@ -73,27 +66,19 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
@Override
public final void unregisterSubscription(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers);
if (!SimpMessageType.UNSUBSCRIBE.equals(type)) {
logger.error("Expected UNSUBSCRIBE message: " + message);
return;
}
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
Assert.isTrue(SimpMessageType.UNSUBSCRIBE.equals(messageType), "Expected UNSUBSCRIBE: " + message);
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
if (sessionId == null) {
logger.error("Ignoring subscription. No sessionId in message: " + message);
logger.error("No sessionId in " + message);
return;
}
String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
if (subscriptionId == null) {
logger.error("Ignoring subscription. No subscriptionId in message: " + message);
logger.error("No subscriptionId " + message);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Unubscribe request: " + message);
}
removeSubscriptionInternal(sessionId, subscriptionId, message);
}
@@ -104,24 +89,15 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
@Override
public final MultiValueMap<String, String> findSubscriptions(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers);
if (!SimpMessageType.MESSAGE.equals(type)) {
logger.trace("Ignoring message type " + type);
return null;
}
Assert.isTrue(SimpMessageType.MESSAGE.equals(type), "Unexpected message type: " + type);
String destination = SimpMessageHeaderAccessor.getDestination(headers);
if (destination == null) {
logger.trace("Ignoring message, no destination");
logger.error("No destination in " + message);
return null;
}
MultiValueMap<String, String> result = findSubscriptionsInternal(destination, message);
if (logger.isTraceEnabled()) {
logger.trace("Found " + result.size() + " subscriptions for destination=" + destination);
}
return result;
return findSubscriptionsInternal(destination, message);
}
protected abstract MultiValueMap<String, String> findSubscriptionsInternal(

View File

@@ -50,7 +50,7 @@ public class BrokerAvailabilityEvent extends ApplicationEvent {
@Override
public String toString() {
return "BrokerAvailabilityEvent=" + this.brokerAvailable;
return "BrokerAvailabilityEvent[available=" + this.brokerAvailable + ", " + getSource() + "]";
}
}

View File

@@ -106,9 +106,6 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
public void unregisterAllSubscriptions(String sessionId) {
SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId);
if (info != null) {
if (logger.isDebugEnabled()) {
logger.debug("Unregistering subscriptions for sessionId=" + sessionId);
}
this.destinationCache.updateAfterRemovedSession(info);
}
}
@@ -137,8 +134,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public String toString() {
return "[destinationCache=" + this.destinationCache + ", subscriptionRegistry="
+ this.subscriptionRegistry + "]";
return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]";
}
@@ -241,7 +237,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public String toString() {
return "[cache=" + this.accessCache + "]";
return "cache[" + this.accessCache.size() + " destination(s)]";
}
}
@@ -282,7 +278,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public String toString() {
return "[sessions=" + sessions + "]";
return "registry[" + sessions.size() + " session(s)]";
}
}

View File

@@ -64,12 +64,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
super(destinationPrefixes);
Assert.notNull(clientInboundChannel, "'clientInboundChannel' must not be null");
Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientOutboundChannel = clientOutboundChannel;
this.brokerChannel = brokerChannel;
@@ -139,7 +136,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
if (!checkDestinationPrefix(destination)) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message to destination=" + destination);
logger.trace("No match on destination in " + message);
}
return;
}
@@ -147,16 +144,10 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
if (SimpMessageType.MESSAGE.equals(messageType)) {
sendMessageToSubscribers(destination, message);
}
else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
this.subscriptionRegistry.registerSubscription(message);
}
else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
this.subscriptionRegistry.unregisterSubscription(message);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
}
else if (SimpMessageType.CONNECT.equals(messageType)) {
if (logger.isInfoEnabled()) {
logger.info("Handling CONNECT: " + message);
}
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
initHeaders(accessor);
accessor.setSessionId(sessionId);
@@ -164,9 +155,27 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
Message<byte[]> connectAck = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
this.clientOutboundChannel.send(connectAck);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
if (logger.isInfoEnabled()) {
logger.info("Handling DISCONNECT: " + message);
}
this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
}
else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
if (logger.isDebugEnabled()) {
logger.debug("Handling SUBSCRIBE: " + message);
}
this.subscriptionRegistry.registerSubscription(message);
}
else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
if (logger.isDebugEnabled()) {
logger.debug("Handling UNSUBSCRIBE: " + message);
}
this.subscriptionRegistry.unregisterSubscription(message);
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Message type not supported. Ignoring: " + message);
logger.trace("Unsupported message type in " + message);
}
}
}
@@ -179,9 +188,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
protected void sendMessageToSubscribers(String destination, Message<?> message) {
MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
if ((subscriptions.size() > 0) && logger.isDebugEnabled()) {
logger.debug("Sending message with destination=" + destination
+ " to " + subscriptions.size() + " subscriber(s)");
if ((subscriptions.size() > 0) && logger.isTraceEnabled()) {
logger.trace("Sending to " + subscriptions.size() + " subscriber(s): " + message);
}
for (String sessionId : subscriptions.keySet()) {
for (String subscriptionId : subscriptions.get(sessionId)) {
@@ -196,10 +204,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
this.clientOutboundChannel.send(reply);
}
catch (Throwable ex) {
logger.error("Failed to send message=" + message, ex);
logger.error("Failed to send " + message, ex);
}
}
}
}
@Override
public String toString() {
return "SimpleBroker[" + this.subscriptionRegistry + "]";
}
}

View File

@@ -327,6 +327,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.tcpClient;
}
/**
* Return the current count of TCP connection to the broker.
*/
public int getConnectionCount() {
return this.connectionHandlers.size();
}
/**
* Configure a {@link MessageHeaderInitializer} to apply to the headers of all
* messages created through the {@code StompBrokerRelayMessageHandler} that
@@ -359,8 +366,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
}
if (logger.isDebugEnabled()) {
logger.debug("Initializing \"system\" connection");
if (logger.isInfoEnabled()) {
logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort);
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
@@ -388,7 +395,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
}
catch (Throwable t) {
logger.error("Error while shutting down TCP client", t);
logger.error("Error in shutdown of TCP client", t);
}
}
@@ -399,14 +406,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (!isBrokerAvailable()) {
if (sessionId == null || SystemStompConnectionHandler.SESSION_ID.equals(sessionId)) {
throw new MessageDeliveryException("Message broker is not active.");
throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
"receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
}
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
if (messageType.equals(SimpMessageType.CONNECT) && logger.isErrorEnabled()) {
logger.error("Message broker is not active. Ignoring: " + message);
logger.error("Broker not active. Ignoring " + message);
}
else if (logger.isDebugEnabled()) {
logger.debug("Message broker is not active. Ignoring: " + message);
logger.debug("Broker not active. Ignoring " + message);
}
return;
}
@@ -416,8 +424,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor == null) {
logger.error("No header accessor, please use SimpMessagingTemplate. Ignoring: " + message);
return;
throw new IllegalStateException(
"No header accessor (not using the SimpMessagingTemplate?): " + message);
}
else if (accessor instanceof StompHeaderAccessor) {
stompAccessor = (StompHeaderAccessor) accessor;
@@ -431,14 +439,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
else {
// Should not happen
logger.error("Unexpected header accessor type: " + accessor + ". Ignoring: " + message);
return;
throw new IllegalStateException(
"Unexpected header accessor type " + accessor.getClass() + " in " + message);
}
if (sessionId == null) {
if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) {
logger.error("Only STOMP SEND frames supported on \"system\" connection. Ignoring: " + message);
logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message);
return;
}
sessionId = SystemStompConnectionHandler.SESSION_ID;
@@ -448,18 +455,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
String destination = stompAccessor.getDestination();
if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message to destination=" + destination);
logger.trace("No match on destination. Ignoring " + message);
}
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Processing message=" + message);
}
if (StompCommand.CONNECT.equals(command)) {
if (logger.isDebugEnabled()) {
logger.debug("Processing CONNECT (total connected=" + this.connectionHandlers.size() + ")");
logger.debug("STOMP CONNECT in session " + sessionId + " (" + getConnectionCount() + " connections).");
}
stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
stompAccessor.setLogin(this.clientLogin);
@@ -474,8 +477,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
else if (StompCommand.DISCONNECT.equals(command)) {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isTraceEnabled()) {
logger.trace("Connection already removed for sessionId '" + sessionId + "'");
if (logger.isDebugEnabled()) {
logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
}
return;
}
@@ -484,8 +487,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
else {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isWarnEnabled()) {
logger.warn("Connection for sessionId '" + sessionId + "' not found. Ignoring message");
if (logger.isDebugEnabled()) {
logger.debug("No TCP connection for session " + sessionId + " in " + message);
}
return;
}
@@ -493,6 +496,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
@Override
public String toString() {
return "StompBrokerRelay[broker=" + this.relayHost + ":" + this.relayPort +
", " + getConnectionCount() + " connection(s)]";
}
private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
@@ -511,15 +520,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this(sessionId, connectHeaders, true);
}
private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders,
boolean isRemoteClientSession) {
Assert.notNull(sessionId, "SessionId must not be null");
Assert.notNull(connectHeaders, "ConnectHeaders must not be null");
private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
Assert.notNull(sessionId, "'sessionId' must not be null");
Assert.notNull(connectHeaders, "'connectHeaders' must not be null");
this.sessionId = sessionId;
this.connectHeaders = connectHeaders;
this.isRemoteClientSession = isRemoteClientSession;
this.isRemoteClientSession = isClientSession;
}
public String getSessionId() {
@@ -528,8 +534,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
if (logger.isDebugEnabled()) {
logger.debug("Established TCP connection to broker in session '" + this.sessionId + "'");
if (logger.isInfoEnabled()) {
logger.info("TCP connection established. Forwarding: " + this.connectHeaders);
}
this.tcpConnection = connection;
connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
@@ -537,19 +543,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void afterConnectFailure(Throwable ex) {
handleTcpConnectionFailure("Failed to connect to message broker", ex);
handleTcpConnectionFailure("failed to establish TCP connection in session " + this.sessionId, ex);
}
/**
* Invoked when any TCP connectivity issue is detected, i.e. failure to establish
* the TCP connection, failure to send a message, missed heartbeat.
* the TCP connection, failure to send a message, missed heartbeat, etc.
*/
protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
protected void handleTcpConnectionFailure(String error, Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error(errorMessage + ", sessionId '" + this.sessionId + "'", ex);
logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex);
}
try {
sendStompErrorToClient(errorMessage);
sendStompErrorFrameToClient(error);
}
finally {
try {
@@ -557,13 +563,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
catch (Throwable t) {
if (logger.isErrorEnabled()) {
logger.error("Failed to close connection: " + t.getMessage());
logger.error("Failure while cleaning up state for TCP connection" +
" in session " + this.sessionId, t);
}
}
}
}
private void sendStompErrorToClient(String errorText) {
private void sendStompErrorFrameToClient(String errorText) {
if (this.isRemoteClientSession) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
if (getHeaderInitializer() != null) {
@@ -584,24 +591,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void handleMessage(Message<byte[]> message) {
StompHeaderAccessor headerAccessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
StompHeaderAccessor headerAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
headerAccessor.setSessionId(this.sessionId);
if (headerAccessor.isHeartbeat()) {
logger.trace("Received broker heartbeat");
if (StompCommand.CONNECTED.equals(headerAccessor.getCommand())) {
if (logger.isInfoEnabled()) {
logger.info("Received STOMP CONNECTED: " + headerAccessor);
}
afterStompConnected(headerAccessor);
}
else if (logger.isDebugEnabled()) {
logger.debug("Received message from broker in session '" + this.sessionId + "'");
}
else if (logger.isErrorEnabled() && StompCommand.ERROR == headerAccessor.getCommand()) {
else if (StompCommand.ERROR.equals(headerAccessor.getCommand()) && logger.isErrorEnabled()) {
logger.error("Received STOMP ERROR: " + message);
}
if (StompCommand.CONNECTED == headerAccessor.getCommand()) {
afterStompConnected(headerAccessor);
else if (logger.isTraceEnabled()) {
logger.trace(headerAccessor.isHeartbeat() ?
"Received heartbeat in session " + this.sessionId : "Received " + headerAccessor);
}
headerAccessor.setImmutable();
@@ -618,12 +622,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
// Remote clients do their own heartbeat management
if (this.isRemoteClientSession) {
return;
}
long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
@@ -640,7 +641,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
conn.send(HEARTBEAT_MESSAGE).addCallback(
new ListenableFutureCallback<Void>() {
public void onFailure(Throwable t) {
handleTcpConnectionFailure("Failed to send heartbeat", t);
String error = "failed to forward heartbeat in \"system\" session.";
handleTcpConnectionFailure(error, t);
}
public void onSuccess(Void result) {}
});
@@ -648,25 +650,25 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}, interval);
}
if (clientReceiveInterval > 0 && serverSendInterval > 0) {
final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
this.tcpConnection.onReadInactivity(new Runnable() {
@Override
public void run() {
handleTcpConnectionFailure("No hearbeat from broker for more than " +
interval + "ms, closing connection", null);
handleTcpConnectionFailure("no messages received for more than " + interval + " ms.", null);
}
}, interval);
}, interval);
}
}
@Override
public void handleFailure(Throwable ex) {
if (this.tcpConnection == null) {
return;
public void handleFailure(Throwable failure) {
if (this.tcpConnection != null) {
handleTcpConnectionFailure("transport failure.", failure);
}
else if (logger.isErrorEnabled()) {
logger.error("Transport failure: " + failure);
}
handleTcpConnectionFailure("Closing connection after TCP failure", ex);
}
@Override
@@ -675,17 +677,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return;
}
try {
if (logger.isDebugEnabled()) {
logger.debug("TCP connection to broker closed in session '" + this.sessionId + "'");
if (logger.isInfoEnabled()) {
logger.info("TCP connection to broker closed in session " + this.sessionId);
}
sendStompErrorToClient("Connection to broker closed");
sendStompErrorFrameToClient("Connection to broker closed.");
}
finally {
try {
// Prevent clearConnection() from trying to close
this.tcpConnection = null;
clearConnection();
}
catch (Throwable t) {
// Ignore
// Shouldn't happen with connection reset beforehand
}
}
}
@@ -697,7 +701,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* received the STOMP CONNECTED frame. For client messages this should be
* false only if we lose the TCP connection around the same time when a
* client message is being forwarded, so we simply log the ignored message
* at trace level. For messages from within the application being sent on
* at debug level. For messages from within the application being sent on
* the "system" connection an exception is raised so that components sending
* the message have a chance to handle it -- by default the broker message
* channel is synchronous.
@@ -714,78 +718,77 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(Message<?> message, final StompHeaderAccessor headerAccessor) {
public ListenableFuture<Void> forward(Message<?> message, final StompHeaderAccessor accessor) {
TcpConnection<byte[]> conn = this.tcpConnection;
if (!this.isStompConnected) {
if (this.isRemoteClientSession) {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring client message received " + message +
(conn != null ? "before CONNECTED frame" : "after TCP connection closed"));
logger.debug("TCP connection closed already, ignoring " + message);
}
return EMPTY_TASK;
}
else {
throw new IllegalStateException("Cannot forward messages on system connection " +
(conn != null ? "before STOMP CONNECTED frame" : "while inactive") +
". Try listening for BrokerAvailabilityEvent ApplicationContext events.");
throw new IllegalStateException("Cannot forward messages " +
(conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
"Consider subscribing to receive BrokerAvailabilityEvent's from " +
"an ApplicationListener Spring bean. Dropped " + message);
}
}
if (logger.isDebugEnabled()) {
if (headerAccessor.isHeartbeat()) {
logger.trace("Forwarding heartbeat to broker");
}
else {
logger.debug("Forwarding message to broker");
}
final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
StompCommand command = accessor.getCommand();
if (accessor.isHeartbeat()) {
logger.trace("Forwarding heartbeat in session " + this.sessionId);
}
else if (StompCommand.SUBSCRIBE.equals(command) && logger.isDebugEnabled()) {
logger.debug("Forwarding SUBSCRIBE: " + messageToSend);
}
else if (StompCommand.UNSUBSCRIBE.equals(command) && logger.isDebugEnabled()) {
logger.debug("Forwarding UNSUBSCRIBE: " + messageToSend);
}
else if (StompCommand.DISCONNECT.equals(command) && logger.isInfoEnabled()) {
logger.info("Forwarding DISCONNECT: " + messageToSend);
}
else if (logger.isTraceEnabled()) {
logger.trace("Forwarding " + command + ": " + messageToSend);
}
if (headerAccessor.isMutable() && headerAccessor.isModified()) {
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
}
ListenableFuture<Void> future = conn.send((Message<byte[]>) message);
ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
if (headerAccessor.getCommand() == StompCommand.DISCONNECT) {
if (accessor.getCommand() == StompCommand.DISCONNECT) {
clearConnection();
}
}
@Override
public void onFailure(Throwable t) {
if (tcpConnection == null) {
// already reset
if (tcpConnection != null) {
handleTcpConnectionFailure("failed to forward " + messageToSend, t);
}
else {
handleTcpConnectionFailure("Failed to send message " + headerAccessor, t);
else if (logger.isErrorEnabled()) {
logger.error("Failed to forward " + messageToSend);
}
}
});
return future;
}
/**
* Close the TCP connection to the broker and release the connection reference,
* Any exception arising from closing the connection is propagated. The caller
* must handle and log the exception accordingly.
*
* <p>If the connection belongs to a client session, the connection handler
* for the session (basically the current instance) is also released from the
* {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler}.
* Clean up state associated with the connection and close it.
* Any exception arising from closing the connection are propagated.
*/
public void clearConnection() {
if (logger.isDebugEnabled()) {
logger.debug("Cleaning up connection state for session " + sessionId + " (" +
(getConnectionCount() - 1) + " remaining connections).");
}
if (this.isRemoteClientSession) {
if (logger.isDebugEnabled()) {
logger.debug("Removing session '" + sessionId + "' (total remaining=" +
(StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")");
}
StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
}
@@ -794,13 +797,16 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
TcpConnection<byte[]> conn = this.tcpConnection;
this.tcpConnection = null;
if (conn != null) {
if (logger.isInfoEnabled()) {
logger.info("Closing TCP connection in session " + this.sessionId);
}
conn.close();
}
}
@Override
public String toString() {
return "StompConnectionHandler{" + "sessionId=" + this.sessionId + "}";
return "StompConnectionHandler[sessionId=" + this.sessionId + "]";
}
}
@@ -820,8 +826,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
@Override
protected void handleTcpConnectionFailure(String errorMessage, Throwable t) {
super.handleTcpConnectionFailure(errorMessage, t);
protected void handleTcpConnectionFailure(String error, Throwable t) {
super.handleTcpConnectionFailure(error, t);
publishBrokerUnavailableEvent();
}
@@ -832,9 +838,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor headerAccessor) {
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, headerAccessor);
ListenableFuture<Void> future = super.forward(message, accessor);
future.get();
return future;
}

View File

@@ -18,6 +18,8 @@ package org.springframework.messaging.simp.stomp;
import org.springframework.core.NestedRuntimeException;
/**
* Raised after a failure to encode or decode a STOMP message.
*
* @author Gary Russell
* @since 4.0
*/

View File

@@ -156,13 +156,13 @@ public class StompDecoder {
headerAccessor.updateSimpMessageHeadersFromStompHeaders();
headerAccessor.setLeaveMutable(true);
decodedMessage = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Decoded " + decodedMessage);
if (logger.isTraceEnabled()) {
logger.trace("Decoded " + decodedMessage);
}
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Received incomplete frame. Resetting buffer.");
logger.trace("Incomplete frame, resetting input buffer.");
}
if (headers != null && headerAccessor != null) {
String name = NativeMessageHeaderAccessor.NATIVE_HEADERS;
@@ -177,7 +177,7 @@ public class StompDecoder {
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Decoded heartbeat");
logger.trace("Decoded heartbeat.");
}
StompHeaderAccessor headerAccessor = StompHeaderAccessor.createForHeartbeat();
initHeaders(headerAccessor);
@@ -224,8 +224,8 @@ public class StompDecoder {
int colonIndex = header.indexOf(':');
if ((colonIndex <= 0) || (colonIndex == header.length() - 1)) {
if (buffer.remaining() > 0) {
throw new StompConversionException(
"Illegal header: '" + header + "'. A header must be of the form <name>:<value>");
throw new StompConversionException("Illegal header: '" + header +
"'. A header must be of the form <name>:<value>.");
}
}
else {

View File

@@ -91,7 +91,7 @@ public final class StompEncoder {
return baos.toByteArray();
}
catch (IOException e) {
throw new StompConversionException("Failed to encode STOMP frame", e);
throw new StompConversionException("Failed to encode STOMP frame, headers=" + headers + ".", e);
}
}
@@ -102,8 +102,8 @@ public final class StompEncoder {
Map<String,List<String>> nativeHeaders =
(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (logger.isDebugEnabled()) {
logger.debug("Encoding STOMP " + command + ", headers=" + nativeHeaders);
if (logger.isTraceEnabled()) {
logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders + ".");
}
if (nativeHeaders == null) {

View File

@@ -100,13 +100,11 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
@Override
public UserDestinationResult resolveDestination(Message<?> message) {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
DestinationInfo info = parseUserDestination(message);
if (info == null) {
return null;
}
Set<String> resolved = new HashSet<String>();
for (String sessionId : info.getSessionIds()) {
String d = getTargetDestination(destination, info.getDestinationWithoutPrefix(), sessionId, info.getUser());
@@ -114,12 +112,10 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
resolved.add(d);
}
}
return new UserDestinationResult(destination, resolved, info.getSubscribeDestination(), info.getUser());
}
private DestinationInfo parseUserDestination(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
String destination = SimpMessageHeaderAccessor.getDestination(headers);
@@ -131,12 +127,13 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
String user;
Set<String> sessionIds;
if (destination == null || !checkDestination(destination, this.destinationPrefix)) {
return null;
}
if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
if (!checkDestination(destination, this.destinationPrefix)) {
return null;
}
if (sessionId == null) {
logger.error("Ignoring message, no session id available");
logger.error("No session id. Ignoring " + message);
return null;
}
destinationWithoutPrefix = destination.substring(this.destinationPrefix.length()-1);
@@ -145,9 +142,6 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
sessionIds = Collections.singleton(sessionId);
}
else if (SimpMessageType.MESSAGE.equals(messageType)) {
if (!checkDestination(destination, this.destinationPrefix)) {
return null;
}
int startIndex = this.destinationPrefix.length();
int endIndex = destination.indexOf('/', startIndex);
Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\"");
@@ -160,27 +154,13 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
Collections.singleton(sessionId) : this.userSessionRegistry.getSessionIds(user));
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring " + messageType + " message");
}
return null;
}
return new DestinationInfo(destinationWithoutPrefix, subscribeDestination, user, sessionIds);
}
protected boolean checkDestination(String destination, String requiredPrefix) {
if (destination == null) {
logger.trace("Ignoring message, no destination");
return false;
}
if (!destination.startsWith(requiredPrefix)) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message to " + destination + ", not a \"user\" destination");
}
return false;
}
return true;
return destination.startsWith(requiredPrefix);
}
/**
@@ -237,6 +217,12 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
public Set<String> getSessionIds() {
return this.sessionIds;
}
@Override
public String toString() {
return "DestinationInfo[destination=" + this.destinationWithoutPrefix + ", subscribeDestination=" +
this.subscribeDestination + ", user=" + this.user + ", sessionIds=" + this.sessionIds + "]";
}
}
}

View File

@@ -171,8 +171,8 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
}
Set<String> destinations = result.getTargetDestinations();
if (destinations.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("No target destinations, message=" + message);
if (logger.isDebugEnabled()) {
logger.debug("Use destination not resolved (no active sessions?): " + message);
}
return;
}
@@ -184,8 +184,8 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
}
for (String destination : destinations) {
if (logger.isDebugEnabled()) {
logger.debug("Sending message to resolved destination=" + destination);
if (logger.isTraceEnabled()) {
logger.trace("Sending message with resolved user destination: " + message);
}
this.brokerMessagingTemplate.send(destination, message);
}

View File

@@ -91,4 +91,10 @@ public class UserDestinationResult {
public String getUser() {
return this.user;
}
@Override
public String toString() {
return "UserDestinationResult[source=" + this.sourceDestination + ", target=" + this.targetDestinations +
", subscribeDestination=" + this.subscribeDestination + ", user=" + this.user + "]";
}
}

View File

@@ -100,18 +100,14 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName
@Override
public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
if (logger.isTraceEnabled()) {
logger.trace("[" + this.beanName + "] sending message=" + message);
logger.trace(this + " sending " + message);
}
message = this.interceptorChain.preSend(message, this);
if (message == null) {
return false;
}
try {
boolean sent = sendInternal(message, timeout);
this.interceptorChain.postSend(message, this, sent);
@@ -121,8 +117,7 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
throw new MessageDeliveryException(message,
"Failed to send message to channel '" + this.getBeanName() + "'", e);
throw new MessageDeliveryException(message,"Failed to send message to " + this, e);
}
}
@@ -131,7 +126,7 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName
@Override
public String toString() {
return "MessageChannel [name=" + this.beanName + "]";
return "MessageChannel[name=" + this.beanName + "]";
}
}

View File

@@ -47,7 +47,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
boolean result = this.handlers.add(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug("[" + getBeanName() + "] subscribed " + handler);
logger.debug(this + " subscribed " + handler);
}
}
return result;
@@ -58,7 +58,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
boolean result = this.handlers.remove(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug("[" + getBeanName() + "] unsubscribed " + handler);
logger.debug(this + " unsubscribed " + handler);
}
}
return result;

View File

@@ -471,7 +471,7 @@ public class MessageHeaderAccessor {
@Override
public String toString() {
return getClass().getSimpleName() + " [headers=" + this.headers + "]";
return getClass().getSimpleName() + "[headers=" + this.headers + "]";
}