From 0340cc5f03c2d2ece2352d8f04bdc87344d27acc Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 7 Nov 2013 17:33:13 -0500 Subject: [PATCH] Update user destinations handling Before this change subscribing to a user-specific destination in STOMP required manually appending a unique queue suffix provided in a header with the CONNECTED frame. This change removes the need to do that. Instead STOMP clients can subscribe to "/user/queue/error" and can then begin to receive messages sent to "/user/{username}/queue/error" without colliding with any other user doing the same. Issue: SPR-11077 --- .../simp/config/MessageBrokerConfigurer.java | 26 +++ .../config/ServletStompEndpointRegistry.java | 8 +- ...cketMessageBrokerConfigurationSupport.java | 31 ++- .../handler/AbstractSubscriptionRegistry.java | 6 +- .../DefaultUserDestinationResolver.java | 187 ++++++++++++++++++ .../handler/DefaultUserSessionRegistry.java | 79 ++++++++ .../SimpleUserQueueSuffixResolver.java | 75 ------- .../UserDestinationMessageHandler.java | 138 +++---------- .../simp/handler/UserDestinationResolver.java | 54 +++++ .../simp/handler/UserQueueSuffixResolver.java | 48 ----- .../simp/handler/UserSessionRegistry.java | 44 +++++ .../simp/stomp/StompProtocolHandler.java | 43 ++-- .../messaging/simp/TestPrincipal.java} | 37 +++- .../ServletStompEndpointRegistryTests.java | 12 +- ...essageBrokerConfigurationSupportTests.java | 6 +- .../DefaultUserDestinationResolverTests.java | 120 +++++++++++ ...a => DefaultUserSessionRegistryTests.java} | 37 ++-- .../UserDestinationMessageHandlerTests.java | 126 ++++++++++++ .../simp/stomp/StompProtocolHandlerTests.java | 3 +- .../web/socket/support/TestPrincipal.java | 3 +- 20 files changed, 783 insertions(+), 300 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistry.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationResolver.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionRegistry.java rename spring-messaging/src/{main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java => test/java/org/springframework/messaging/simp/TestPrincipal.java} (51%) create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolverTests.java rename spring-messaging/src/test/java/org/springframework/messaging/simp/handler/{SimpleUserQueueSuffixResolverTests.java => DefaultUserSessionRegistryTests.java} (54%) create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java index 51c5bfe681..96f4ba183e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java @@ -39,6 +39,8 @@ public class MessageBrokerConfigurer { private String[] annotationMethodDestinationPrefixes; + private String userDestinationPrefix; + public MessageBrokerConfigurer(MessageChannel webSocketResponseChannel) { Assert.notNull(webSocketResponseChannel); @@ -81,6 +83,26 @@ public class MessageBrokerConfigurer { return this; } + /** + * Configure the prefix used to identify user destinations. User destinations + * provide the ability for a user to subscribe to queue names unique to their + * session as well as for others to send messages to those unique, + * user-specific queues. + *

+ * For example when a user attempts to subscribe to "/user/queue/position-updates", + * the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a + * unique queue name that does not collide with any other user attempting to do the same. + * Subsequently when messages are sent to "/user/{username}/queue/position-updates", + * the destination is translated to "/queue/position-updatesi9oqdfzo". + *

+ * The default prefix used to identify such destinations is "/user/". + */ + public MessageBrokerConfigurer setUserDestinationPrefix(String destinationPrefix) { + this.userDestinationPrefix = destinationPrefix; + return this; + } + + protected AbstractBrokerMessageHandler getSimpleBroker() { initSimpleBrokerIfNecessary(); return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null; @@ -100,4 +122,8 @@ public class MessageBrokerConfigurer { return (this.annotationMethodDestinationPrefixes != null) ? Arrays.asList(this.annotationMethodDestinationPrefixes) : null; } + + protected String getUserDestinationPrefix() { + return this.userDestinationPrefix; + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java index 3e1050c920..0dec614f1d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; -import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.UserSessionRegistry; import org.springframework.messaging.simp.stomp.StompProtocolHandler; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; @@ -54,15 +54,15 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry { public ServletStompEndpointRegistry(WebSocketHandler webSocketHandler, - MutableUserQueueSuffixResolver userQueueSuffixResolver, TaskScheduler defaultSockJsTaskScheduler) { + UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) { Assert.notNull(webSocketHandler); - Assert.notNull(userQueueSuffixResolver); + Assert.notNull(userSessionRegistry); this.webSocketHandler = webSocketHandler; this.subProtocolWebSocketHandler = findSubProtocolWebSocketHandler(webSocketHandler); this.stompHandler = new StompProtocolHandler(); - this.stompHandler.setUserQueueSuffixResolver(userQueueSuffixResolver); + this.stompHandler.setUserSessionRegistry(userSessionRegistry); this.sockJsScheduler = defaultSockJsTaskScheduler; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java index 9ebb9d842c..30e376fda3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java @@ -68,8 +68,9 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { @Bean public HandlerMapping brokerWebSocketHandlerMapping() { - ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry(subProtocolWebSocketHandler(), - userQueueSuffixResolver(), brokerDefaultSockJsTaskScheduler()); + + ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry( + subProtocolWebSocketHandler(), userSessionRegistry(), brokerDefaultSockJsTaskScheduler()); registerStompEndpoints(registry); AbstractHandlerMapping hm = registry.getHandlerMapping(); @@ -85,13 +86,13 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { } @Bean - public MutableUserQueueSuffixResolver userQueueSuffixResolver() { - return new SimpleUserQueueSuffixResolver(); + public UserSessionRegistry userSessionRegistry() { + return new DefaultUserSessionRegistry(); } /** * The default TaskScheduler to use if none is configured via - * {@link SockJsServiceRegistration#setTaskScheduler()}, i.e. + * {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e. *

 	 * @Configuration
 	 * @EnableWebSocketMessageBroker
@@ -137,8 +138,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
 
 	@Bean
 	public SimpAnnotationMethodMessageHandler annotationMethodMessageHandler() {
+
 		SimpAnnotationMethodMessageHandler handler =
 				new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketResponseChannel());
+
 		handler.setDestinationPrefixes(getMessageBrokerConfigurer().getAnnotationMethodDestinationPrefixes());
 		handler.setMessageConverter(simpMessageConverter());
 		webSocketRequestChannel().subscribe(handler);
@@ -185,8 +188,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
 
 	@Bean
 	public UserDestinationMessageHandler userDestinationMessageHandler() {
+
 		UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
-				brokerMessagingTemplate(), userQueueSuffixResolver());
+				brokerMessagingTemplate(), userDestinationResolver());
+
 		webSocketRequestChannel().subscribe(handler);
 		brokerChannel().subscribe(handler);
 		return handler;
@@ -195,6 +200,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
 	@Bean
 	public SimpMessageSendingOperations brokerMessagingTemplate() {
 		SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
+		String userDestinationPrefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
+		if (userDestinationPrefix != null) {
+			template.setUserDestinationPrefix(userDestinationPrefix);
+		}
 		template.setMessageConverter(simpMessageConverter());
 		return template;
 	}
@@ -220,6 +229,16 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
 		return new CompositeMessageConverter(converters, contentTypeResolver);
 	}
 
+	@Bean
+	public UserDestinationResolver userDestinationResolver() {
+		DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userSessionRegistry());
+		String prefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
+		if (prefix != null) {
+			resolver.setUserDestinationPrefix(prefix);
+		}
+		return resolver;
+	}
+
 
 	private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) {
 
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java
index a3cad786d8..0da9e7a521 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java
@@ -108,7 +108,11 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
 		if (logger.isTraceEnabled()) {
 			logger.trace("Find subscriptions, destination=" + headers.getDestination());
 		}
-		return findSubscriptionsInternal(destination, message);
+		MultiValueMap result = findSubscriptionsInternal(destination, message);
+		if (logger.isTraceEnabled()) {
+			logger.trace("Found " + result.size() + " subscriptions");
+		}
+		return result;
 	}
 
 	protected abstract MultiValueMap findSubscriptionsInternal(
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java
new file mode 100644
index 0000000000..d5a3e88b2f
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.simp.handler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.SimpMessageType;
+import org.springframework.util.Assert;
+
+import java.security.Principal;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A default implementation of {@link @UserDestinationResolver}.
+ * 

+ * Resolves messages sent to destination patterns "/user/{user-name}/**" as well as + * subscriptions to destinations "/user/queue/**" where the "/user/" prefix used to + * recognize such destinations is customizable via + * {@link #setUserDestinationPrefix(String)}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class DefaultUserDestinationResolver implements UserDestinationResolver { + + private static final Log logger = LogFactory.getLog(DefaultUserDestinationResolver.class); + + + private final UserSessionRegistry userSessionRegistry; + + private String destinationPrefix = "/user/"; + + private String subscriptionDestinationPrefix = "/user/queue/"; + + + /** + * Create an instance that will access user session id information through + * the provided registry. + * + * @param userSessionRegistry the registry, never {@code null} + */ + public DefaultUserDestinationResolver(UserSessionRegistry userSessionRegistry) { + Assert.notNull(userSessionRegistry, "'userSessionRegistry' is required"); + this.userSessionRegistry = userSessionRegistry; + } + + /** + * The prefix used to identify user destinations. Any destinations that do not + * start with the given prefix are not be resolved. + *

+ * The default value is "/user/". + * + * @param prefix the prefix to use + */ + public void setUserDestinationPrefix(String prefix) { + Assert.hasText(prefix, "prefix is required"); + this.destinationPrefix = prefix.endsWith("/") ? prefix : prefix + "/"; + this.subscriptionDestinationPrefix = this.destinationPrefix + "queue/"; + } + + /** + * Return the prefix used to identify user destinations. Any destinations that do not + * start with the given prefix are not be resolved. + *

+ * By default "/user/queue/". + */ + public String getDestinationPrefix() { + return this.destinationPrefix; + } + + /** + * Return the prefix used to identify user destinations for (un)subscribe messages. + *

+ * By default "/user/queue/". + */ + public String getSubscriptionDestinationPrefix() { + return this.subscriptionDestinationPrefix; + } + + @Override + public Set resolveDestination(Message message) { + + SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); + UserDestinationInfo info = getUserDestinationInfo(headers); + if (info == null) { + return Collections.emptySet(); + } + + Set set = new HashSet(); + for (String sessionId : this.userSessionRegistry.getSessionIds(info.getUser())) { + set.add(getTargetDestination(headers.getDestination(), info.getDestination(), sessionId, info.getUser())); + } + return set; + } + + protected String getTargetDestination(String originalDestination, String targetDestination, + String sessionId, String user) { + + return targetDestination + "-user" + sessionId; + } + + private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor headers) { + + String destination = headers.getDestination(); + if (destination == null) { + logger.trace("Ignoring message, no destination"); + return null; + } + + String targetUser; + String targetDestination; + + Principal user = headers.getUser(); + SimpMessageType messageType = headers.getMessageType(); + + if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) { + if (user == null) { + logger.trace("Ignoring (un)subscribe message, no user information"); + return null; + } + if (!destination.startsWith(this.subscriptionDestinationPrefix)) { + logger.trace("Ignoring (un)subscribe message, not a \"user\" destination"); + return null; + } + targetUser = user.getName(); + targetDestination = destination.substring(this.destinationPrefix.length()-1); + } + else if (SimpMessageType.MESSAGE.equals(messageType)) { + if (!destination.startsWith(this.destinationPrefix)) { + logger.trace("Ignoring message, not a \"user\" destination"); + return null; + } + int startIndex = this.destinationPrefix.length(); + int endIndex = destination.indexOf('/', startIndex); + Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\""); + targetUser = destination.substring(startIndex, endIndex); + targetDestination = destination.substring(endIndex); + + } + else { + logger.trace("Ignoring message, not of the right message type"); + return null; + } + + return new UserDestinationInfo(targetUser, targetDestination); + } + + + private static class UserDestinationInfo { + + private final String user; + + private final String destination; + + private UserDestinationInfo(String user, String destination) { + this.user = user; + this.destination = destination; + } + + private String getUser() { + return this.user; + } + + private String getDestination() { + return this.destination; + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistry.java new file mode 100644 index 0000000000..e8dc33b8a7 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistry.java @@ -0,0 +1,79 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import org.springframework.util.Assert; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * A default thread-safe implementation of {@link @UserSessionRegistry}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class DefaultUserSessionRegistry implements UserSessionRegistry { + + // userId -> sessionId + private final ConcurrentMap> userSessionIds = new ConcurrentHashMap>(); + + private final Object lock = new Object(); + + + @Override + public Set getSessionIds(String user) { + Set set = this.userSessionIds.get(user); + return (set != null) ? set : Collections.emptySet(); + } + + @Override + public void registerSessionId(String user, String sessionId) { + + Assert.notNull(user, "'user' is required"); + Assert.notNull(user, "'sessionId' is required"); + + synchronized (this.lock) { + Set set = this.userSessionIds.get(user); + if (set == null) { + set = new CopyOnWriteArraySet(); + this.userSessionIds.put(user, set); + } + set.add(sessionId); + } + } + + @Override + public void unregisterSessionId(String userName, String sessionId) { + + Assert.notNull(userName, "'userName' is required"); + Assert.notNull(userName, "'sessionId' is required"); + + synchronized (lock) { + Set set = this.userSessionIds.get(userName); + if (set != null) { + if (set.remove(sessionId) && set.isEmpty()) { + this.userSessionIds.remove(userName); + } + } + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java deleted file mode 100644 index 6b3d018b59..0000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.handler; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - - -/** - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class SimpleUserQueueSuffixResolver implements MutableUserQueueSuffixResolver { - - // userId -> [sessionId -> queueSuffix] - private final ConcurrentMap> cache = new ConcurrentHashMap>(); - - - @Override - public void addQueueSuffix(String user, String sessionId, String suffix) { - Map suffixes = this.cache.get(user); - if (suffixes == null) { - suffixes = new ConcurrentHashMap(); - Map prevSuffixes = this.cache.putIfAbsent(user, suffixes); - if (prevSuffixes != null) { - suffixes = prevSuffixes; - } - } - suffixes.put(sessionId, suffix); - } - - @Override - public void removeQueueSuffix(String user, String sessionId) { - Map suffixes = this.cache.get(user); - if (suffixes != null) { - if (suffixes.remove(sessionId) != null) { - this.cache.remove(user, Collections.emptyMap()); - } - } - } - - @Override - public Set getUserQueueSuffixes(String user) { - Map suffixes = this.cache.get(user); - return (suffixes != null) ? new HashSet(suffixes.values()) : Collections.emptySet(); - } - - @Override - public String getUserQueueSuffix(String user, String sessionId) { - Map suffixes = this.cache.get(user); - if (suffixes != null) { - return suffixes.get(sessionId); - } - return null; - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java index a645a270ec..d1da249399 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java @@ -23,20 +23,19 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.core.MessageSendingOperations; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; -import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; + +import java.util.Set; /** - * Supports destinations prefixed with "/user/{username}", transforms the - * destination to a unique queue to which the user is subscribed, and then sends - * the message for further processing. - * - *

The target destination has the prefix removed and a unique queue suffix, - * resolved via {@link #setUserQueueSuffixResolver(UserQueueSuffixResolver)}, appended. - * For example a destination such as "/user/john/queue/trade-confirmation" could - * be transformed to "/queue/trade-confirmation/i9oqdfzo". + * Provides support for messages sent to "user" destinations, translating the + * destination to one or more user-specific destination(s) and then sending message(s) + * with the updated target destination using the provided messaging template. + *

+ * See {@link UserDestinationResolver} for more details and examples. * * @author Rossen Stoyanchev * @since 4.0 @@ -45,140 +44,67 @@ public class UserDestinationMessageHandler implements MessageHandler { private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class); + private final MessageSendingOperations messagingTemplate; - private String destinationPrefix = "/user/"; - - private UserQueueSuffixResolver userQueueSuffixResolver = new SimpleUserQueueSuffixResolver(); + private final UserDestinationResolver userDestinationResolver; /** + * Create an instance of the handler with the given messaging template and a + * user destination resolver. * - * @param messagingTemplate - * @param resolver the resolver to use to find queue suffixes for a user + * @param messagingTemplate a messaging template to use for sending messages + * with translated user destinations + * @param userDestinationResolver the resolver to use to find queue suffixes for a user */ public UserDestinationMessageHandler(MessageSendingOperations messagingTemplate, - UserQueueSuffixResolver userQueueSuffixResolver) { + UserDestinationResolver userDestinationResolver) { Assert.notNull(messagingTemplate, "messagingTemplate is required"); - Assert.notNull(userQueueSuffixResolver, "userQueueSuffixResolver is required"); + Assert.notNull(userDestinationResolver, "destinationResolver is required"); this.messagingTemplate = messagingTemplate; - this.userQueueSuffixResolver = userQueueSuffixResolver; + this.userDestinationResolver = userDestinationResolver; } /** - *

The default prefix is "/user". - * @param prefix the prefix to set + * Return the configured {@link UserDestinationResolver}. */ - public void setDestinationPrefix(String prefix) { - Assert.hasText(prefix, "prefix is required"); - this.destinationPrefix = prefix.endsWith("/") ? prefix : prefix + "/"; + public UserDestinationResolver getUserDestinationResolver() { + return this.userDestinationResolver; } /** - * @return the prefix - */ - public String getDestinationPrefix() { - return this.destinationPrefix; - } - - /** - * @return the resolver for queue suffixes for a user - */ - public UserQueueSuffixResolver getUserQueueSuffixResolver() { - return this.userQueueSuffixResolver; - } - - /** - * @return the messagingTemplate + * Return the configured messaging template for sending messages with + * translated destinations. */ public MessageSendingOperations getMessagingTemplate() { return this.messagingTemplate; } + @Override public void handleMessage(Message message) throws MessagingException { - SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); - SimpMessageType messageType = headers.getMessageType(); - String destination = headers.getDestination(); - - if (!SimpMessageType.MESSAGE.equals(messageType)) { - return; - } - - if (!checkDestination(destination)) { - return; - } - if (logger.isTraceEnabled()) { - logger.trace("Processing message to destination " + destination); + logger.trace("Handling message " + message); } - UserDestinationParser destinationParser = new UserDestinationParser(destination); - String user = destinationParser.getUser(); - - if (user == null) { - if (logger.isErrorEnabled()) { - logger.error("Ignoring message, expected destination pattern \"" + this.destinationPrefix - + "{userId}/**\": " + destination); - } + Set destinations = this.userDestinationResolver.resolveDestination(message); + if (CollectionUtils.isEmpty(destinations)) { return; } - for (String sessionId : this.userQueueSuffixResolver.getUserQueueSuffixes(user)) { - - String targetDestination = destinationParser.getTargetDestination(sessionId); + for (String targetDestination : destinations) { + if (logger.isTraceEnabled()) { + logger.trace("Sending message to resolved user destination: " + targetDestination); + } + SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); headers.setDestination(targetDestination); message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); - - if (logger.isTraceEnabled()) { - logger.trace("Sending message to resolved target destination " + targetDestination); - } this.messagingTemplate.send(targetDestination, message); } } - private boolean checkDestination(String destination) { - if (destination != null) { - if (destination.startsWith(this.destinationPrefix)) { - return true; - } - } - return false; - } - - - private class UserDestinationParser { - - private final String user; - - private final String targetDestination; - - - public UserDestinationParser(String destination) { - - int userStartIndex = destinationPrefix.length(); - int userEndIndex = destination.indexOf('/', userStartIndex); - - if (userEndIndex > 0) { - this.user = destination.substring(userStartIndex, userEndIndex); - this.targetDestination = destination.substring(userEndIndex); - } - else { - this.user = null; - this.targetDestination = null; - } - } - - public String getUser() { - return this.user; - } - - public String getTargetDestination(String sessionId) { - return (this.targetDestination != null) ? this.targetDestination + sessionId : null; - } - } - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationResolver.java new file mode 100644 index 0000000000..420c7ec6d7 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationResolver.java @@ -0,0 +1,54 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import org.springframework.messaging.Message; + +import java.util.Set; + + +/** + * A strategy for resolving unique, user destinations per session. User destinations + * provide a user with the ability to subscribe to a queue unique to their session + * as well others with the ability to send messages to those queues. + *

+ * For example when a user attempts to subscribe to "/user/queue/position-updates", + * the destination may be resolved to "/queue/position-updates-useri9oqdfzo" yielding a + * unique queue name that does not collide with any other user attempting to do the same. + * Subsequently when messages are sent to "/user/{username}/queue/position-updates", + * the destination is translated to "/queue/position-updates-useri9oqdfzo". + * + * @author Rossen Stoyanchev + * @since 4.0 + * + * @see UserDestinationMessageHandler + */ +public interface UserDestinationResolver { + + /** + * Resolve the destination of the message to one or more user/session-specific target + * destinations. If the user has multiple sessions, the method may return more than + * one target destinations. + * + * @param message the message to resolve + * + * @return the resolved unique user destinations or an empty Set if the message + * destination is not recognized as a user destination + */ + Set resolveDestination(Message message); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java deleted file mode 100644 index 0acf82c253..0000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.handler; - -import java.util.Set; - - -/** - * A strategy for resolving unique queue suffixes for a connected user. - * There can be only one suffix per user per session. - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public interface UserQueueSuffixResolver { - - /** - * Retrieve the suffixes for all sessions associated with this user. - * - * @param user the user name - * @return a Set with zero, one, or more, queue suffixes - */ - Set getUserQueueSuffixes(String user); - - /** - * Retrieve the queue suffix associated with the given user session. - * - * @param user the user name - * @param sessionId the session id - * @return a queue suffix or {@code null} - */ - String getUserQueueSuffix(String user, String sessionId); - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionRegistry.java new file mode 100644 index 0000000000..42e9272b0f --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionRegistry.java @@ -0,0 +1,44 @@ +package org.springframework.messaging.simp.handler; + +import java.util.Set; + +/** + * A registry for looking up active session id's by user. + *

+ * Used in support of resolving unique session-specific user destinations. + * See {@link DefaultUserDestinationResolver} for more details. + * + * @author Rossen Stoyanchev + * @since 4.0 + * + * @see DefaultUserDestinationResolver + */ +public interface UserSessionRegistry { + + + /** + * Return the active session id's for the given user. + * + * @param user the user + * @return a set with 0 or more session id's + */ + Set getSessionIds(String user); + + /** + * Register an active session id for the given user. + * + * @param user the user + * @param sessionId the session id + */ + void registerSessionId(String user, String sessionId); + + /** + * Unregister the session id for a user. + * + * @param user the user + * @param sessionId the session id + */ + void unregisterSessionId(String user, String sessionId); + + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java index 2a4b17cdbb..17e4d51ebe 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java @@ -30,8 +30,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.handler.websocket.SubProtocolHandler; import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; -import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.UserSessionRegistry; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.web.socket.CloseStatus; @@ -45,6 +44,7 @@ import org.springframework.web.socket.WebSocketSession; * * @author Rossen Stoyanchev * @author Andy Wilkinson + * @since 4.0 */ public class StompProtocolHandler implements SubProtocolHandler { @@ -54,36 +54,30 @@ public class StompProtocolHandler implements SubProtocolHandler { */ public static final String CONNECTED_USER_HEADER = "user-name"; - /** - * A suffix unique to the current session that a client can use to append to - * a destination to make it unique. - * - * @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler} - */ - public static final String QUEUE_SUFFIX_HEADER = "queue-suffix"; + private static final Log logger = LogFactory.getLog(StompProtocolHandler.class); - private final Log logger = LogFactory.getLog(StompProtocolHandler.class); private final StompDecoder stompDecoder = new StompDecoder(); private final StompEncoder stompEncoder = new StompEncoder(); - private MutableUserQueueSuffixResolver queueSuffixResolver = new SimpleUserQueueSuffixResolver(); + private UserSessionRegistry userSessionRegistry; /** - * Configure a resolver to use to maintain queue suffixes for user + * Provide a registry with which to register active user session ids. + * * @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler} */ - public void setUserQueueSuffixResolver(MutableUserQueueSuffixResolver resolver) { - this.queueSuffixResolver = resolver; + public void setUserSessionRegistry(UserSessionRegistry registry) { + this.userSessionRegistry = registry; } /** - * @return the resolver for queue suffixes for a user + * @return the configured UserSessionRegistry. */ - public MutableUserQueueSuffixResolver getUserQueueSuffixResolver() { - return this.queueSuffixResolver; + public UserSessionRegistry getUserSessionRegistry() { + return this.userSessionRegistry; } @Override @@ -162,7 +156,7 @@ public class StompProtocolHandler implements SubProtocolHandler { } if (headers.getCommand() == StompCommand.CONNECTED) { - augmentConnectedHeaders(headers, session); + afterStompSessionConnected(headers, session); } if (StompCommand.MESSAGE.equals(headers.getCommand()) && (headers.getSubscriptionId() == null)) { @@ -189,6 +183,7 @@ public class StompProtocolHandler implements SubProtocolHandler { session.close(CloseStatus.PROTOCOL_ERROR); } catch (IOException e) { + // Ignore } } } @@ -216,15 +211,13 @@ public class StompProtocolHandler implements SubProtocolHandler { } } - private void augmentConnectedHeaders(StompHeaderAccessor headers, WebSocketSession session) { + private void afterStompSessionConnected(StompHeaderAccessor headers, WebSocketSession session) { Principal principal = session.getPrincipal(); if (principal != null) { headers.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); - headers.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId()); - - if (this.queueSuffixResolver != null) { + if (this.userSessionRegistry != null) { String suffix = session.getId(); - this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix); + this.userSessionRegistry.registerSessionId(principal.getName(), session.getId()); } } } @@ -242,8 +235,8 @@ public class StompProtocolHandler implements SubProtocolHandler { @Override public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { - if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) { - this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), session.getId()); + if ((this.userSessionRegistry != null) && (session.getPrincipal() != null)) { + this.userSessionRegistry.unregisterSessionId(session.getPrincipal().getName(), session.getId()); } StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/TestPrincipal.java similarity index 51% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/TestPrincipal.java index 1fe326958d..ec03cc7dcd 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/TestPrincipal.java @@ -14,17 +14,44 @@ * limitations under the License. */ -package org.springframework.messaging.simp.handler; +package org.springframework.messaging.simp; + +import java.security.Principal; /** + * An implementation of {@link Principal} for testing. + * * @author Rossen Stoyanchev - * @since 4.0 */ -public interface MutableUserQueueSuffixResolver extends UserQueueSuffixResolver { +public class TestPrincipal implements Principal { - void addQueueSuffix(String user, String sessionId, String suffix); + private String name; - void removeQueueSuffix(String user, String sessionId); + public TestPrincipal(String name) { + this.name = name; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof TestPrincipal)) { + return false; + } + TestPrincipal p = (TestPrincipal) obj; + return this.name.equals(p.name); + } + + @Override + public int hashCode() { + return this.name.hashCode(); + } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java index 7531e8ed20..480e9f34c7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java @@ -24,8 +24,8 @@ import org.mockito.Mockito; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.handler.websocket.SubProtocolHandler; import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; -import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; -import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.DefaultUserSessionRegistry; +import org.springframework.messaging.simp.handler.UserSessionRegistry; import org.springframework.messaging.simp.stomp.StompProtocolHandler; import org.springframework.scheduling.TaskScheduler; import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; @@ -44,16 +44,16 @@ public class ServletStompEndpointRegistryTests { private SubProtocolWebSocketHandler webSocketHandler; - private MutableUserQueueSuffixResolver queueSuffixResolver; + private UserSessionRegistry userSessionRegistry; @Before public void setup() { MessageChannel channel = Mockito.mock(MessageChannel.class); this.webSocketHandler = new SubProtocolWebSocketHandler(channel); - this.queueSuffixResolver = new SimpleUserQueueSuffixResolver(); + this.userSessionRegistry = new DefaultUserSessionRegistry(); TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class); - this.registry = new ServletStompEndpointRegistry(webSocketHandler, queueSuffixResolver, taskScheduler); + this.registry = new ServletStompEndpointRegistry(webSocketHandler, userSessionRegistry, taskScheduler); } @@ -69,7 +69,7 @@ public class ServletStompEndpointRegistryTests { assertNotNull(protocolHandlers.get("v12.stomp")); StompProtocolHandler stompHandler = (StompProtocolHandler) protocolHandlers.get("v10.stomp"); - assertSame(this.queueSuffixResolver, stompHandler.getUserQueueSuffixResolver()); + assertSame(this.userSessionRegistry, stompHandler.getUserSessionRegistry()); } @Test diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java index fdf6ad7499..bf01420b39 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java @@ -35,9 +35,9 @@ import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandl import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.annotation.SubscribeEvent; import org.springframework.messaging.simp.handler.SimpAnnotationMethodMessageHandler; -import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler; import org.springframework.messaging.simp.handler.UserDestinationMessageHandler; +import org.springframework.messaging.simp.handler.UserSessionRegistry; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; @@ -259,7 +259,7 @@ public class WebSocketMessageBrokerConfigurationSupportTests { SubscribableChannel channel = this.cxtSimpleBroker.getBean("brokerChannel", SubscribableChannel.class); UserDestinationMessageHandler messageHandler = this.cxtSimpleBroker.getBean(UserDestinationMessageHandler.class); - this.cxtSimpleBroker.getBean(MutableUserQueueSuffixResolver.class).addQueueSuffix("joe", "s1", "s1"); + this.cxtSimpleBroker.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1"); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setDestination("/user/joe/foo"); @@ -274,7 +274,7 @@ public class WebSocketMessageBrokerConfigurationSupportTests { headers = StompHeaderAccessor.wrap(message); assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); - assertEquals("/foos1", headers.getDestination()); + assertEquals("/foo-users1", headers.getDestination()); } @Test diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolverTests.java new file mode 100644 index 0000000000..4a62301f12 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolverTests.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.TestPrincipal; +import org.springframework.messaging.support.MessageBuilder; + +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for {@link DefaultUserDestinationResolver}. + */ +public class DefaultUserDestinationResolverTests { + + private DefaultUserDestinationResolver resolver; + + private UserSessionRegistry registry; + + + @Before + public void setup() { + this.registry = new DefaultUserSessionRegistry(); + this.resolver = new DefaultUserDestinationResolver(this.registry); + } + + + @Test + public void handleSubscribe() { + Message message = createMessage(SimpMessageType.SUBSCRIBE, "joe", "/user/queue/foo"); + this.registry.registerSessionId("joe", "123"); + Set actual = this.resolver.resolveDestination(message); + + assertEquals(1, actual.size()); + assertEquals("/queue/foo-user123", actual.iterator().next()); + } + + @Test + public void handleUnsubscribe() { + Message message = createMessage(SimpMessageType.UNSUBSCRIBE, "joe", "/user/queue/foo"); + this.registry.registerSessionId("joe", "123"); + Set actual = this.resolver.resolveDestination(message); + + assertEquals(1, actual.size()); + assertEquals("/queue/foo-user123", actual.iterator().next()); + } + + @Test + public void handleMessage() { + Message message = createMessage(SimpMessageType.MESSAGE, "joe", "/user/joe/queue/foo"); + this.registry.registerSessionId("joe", "123"); + Set actual = this.resolver.resolveDestination(message); + + assertEquals(1, actual.size()); + assertEquals("/queue/foo-user123", actual.iterator().next()); + } + + + @Test + public void ignoreMessage() { + + // no destination + Message message = createMessage(SimpMessageType.MESSAGE, "joe", null); + Set actual = this.resolver.resolveDestination(message); + assertEquals(0, actual.size()); + + // not a user destination + message = createMessage(SimpMessageType.MESSAGE, "joe", "/queue/foo"); + actual = this.resolver.resolveDestination(message); + assertEquals(0, actual.size()); + + // subscribe + no user + message = createMessage(SimpMessageType.SUBSCRIBE, null, "/user/queue/foo"); + actual = this.resolver.resolveDestination(message); + assertEquals(0, actual.size()); + + // subscribe + not a user destination + message = createMessage(SimpMessageType.SUBSCRIBE, "joe", "/queue/foo"); + actual = this.resolver.resolveDestination(message); + assertEquals(0, actual.size()); + + // no match on message type + message = createMessage(SimpMessageType.CONNECT, "joe", "user/joe/queue/foo"); + actual = this.resolver.resolveDestination(message); + assertEquals(0, actual.size()); + } + + + private Message createMessage(SimpMessageType messageType, String user, String destination) { + SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(messageType); + if (destination != null) { + headers.setDestination(destination); + } + if (user != null) { + headers.setUser(new TestPrincipal(user)); + } + return MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistryTests.java similarity index 54% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistryTests.java index 8a2386640c..e03a8dfdf1 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistryTests.java @@ -27,56 +27,57 @@ import static org.junit.Assert.*; /** - * Test fixture for {@link SimpleUserQueueSuffixResolver} + * Test fixture for {@link DefaultUserSessionRegistry} * * @author Rossen Stoyanchev * @since 4.0 */ -public class SimpleUserQueueSuffixResolverTests { +public class DefaultUserSessionRegistryTests { private static final String user = "joe"; + private static final List sessionIds = Arrays.asList("sess01", "sess02", "sess03"); @Test public void addOneSessionId() { - SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); - resolver.addQueueSuffix(user, sessionIds.get(0), sessionIds.get(0)); + DefaultUserSessionRegistry resolver = new DefaultUserSessionRegistry(); + resolver.registerSessionId(user, sessionIds.get(0)); - assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user)); - assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes("jane")); + assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getSessionIds(user)); + assertSame(Collections.emptySet(), resolver.getSessionIds("jane")); } @Test public void addMultipleSessionIds() { - SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); + DefaultUserSessionRegistry resolver = new DefaultUserSessionRegistry(); for (String sessionId : sessionIds) { - resolver.addQueueSuffix(user, sessionId, sessionId); + resolver.registerSessionId(user, sessionId); } - assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user)); - assertEquals(Collections.emptySet(), resolver.getUserQueueSuffixes("jane")); + assertEquals(new LinkedHashSet<>(sessionIds), resolver.getSessionIds(user)); + assertEquals(Collections.emptySet(), resolver.getSessionIds("jane")); } @Test public void removeSessionIds() { - SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); + DefaultUserSessionRegistry resolver = new DefaultUserSessionRegistry(); for (String sessionId : sessionIds) { - resolver.addQueueSuffix(user, sessionId, sessionId); + resolver.registerSessionId(user, sessionId); } - assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user)); + assertEquals(new LinkedHashSet<>(sessionIds), resolver.getSessionIds(user)); - resolver.removeQueueSuffix(user, sessionIds.get(1)); - resolver.removeQueueSuffix(user, sessionIds.get(2)); - assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user)); + resolver.unregisterSessionId(user, sessionIds.get(1)); + resolver.unregisterSessionId(user, sessionIds.get(2)); + assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getSessionIds(user)); - resolver.removeQueueSuffix(user, sessionIds.get(0)); - assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes(user)); + resolver.unregisterSessionId(user, sessionIds.get(0)); + assertSame(Collections.emptySet(), resolver.getSessionIds(user)); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java new file mode 100644 index 0000000000..c5324e2c36 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java @@ -0,0 +1,126 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.springframework.messaging.Message; +import org.springframework.messaging.core.MessageSendingOperations; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.TestPrincipal; +import org.springframework.messaging.support.MessageBuilder; + +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for {@link UserDestinationMessageHandler}. + */ +public class UserDestinationMessageHandlerTests { + + private UserDestinationMessageHandler messageHandler; + + private MessageSendingOperations messagingTemplate; + + private UserSessionRegistry registry; + + + @Before + public void setup() { + this.messagingTemplate = Mockito.mock(MessageSendingOperations.class); + this.registry = new DefaultUserSessionRegistry(); + DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(this.registry); + this.messageHandler = new UserDestinationMessageHandler(this.messagingTemplate, resolver); + } + + + @Test + public void handleSubscribe() { + this.registry.registerSessionId("joe", "123"); + this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, "joe", "/user/queue/foo")); + + ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor2 = ArgumentCaptor.forClass(Message.class); + Mockito.verify(this.messagingTemplate).send(captor1.capture(), captor2.capture()); + + assertEquals("/queue/foo-user123", captor1.getValue()); + } + + @Test + public void handleUnsubscribe() { + this.registry.registerSessionId("joe", "123"); + this.messageHandler.handleMessage(createMessage(SimpMessageType.UNSUBSCRIBE, "joe", "/user/queue/foo")); + + ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor2 = ArgumentCaptor.forClass(Message.class); + Mockito.verify(this.messagingTemplate).send(captor1.capture(), captor2.capture()); + + assertEquals("/queue/foo-user123", captor1.getValue()); + } + + @Test + public void handleMessage() { + this.registry.registerSessionId("joe", "123"); + this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "/user/joe/queue/foo")); + + ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor2 = ArgumentCaptor.forClass(Message.class); + Mockito.verify(this.messagingTemplate).send(captor1.capture(), captor2.capture()); + + assertEquals("/queue/foo-user123", captor1.getValue()); + } + + + @Test + public void ignoreMessage() { + + // no destination + this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", null)); + Mockito.verifyZeroInteractions(this.messagingTemplate); + + // not a user destination + this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "/queue/foo")); + Mockito.verifyZeroInteractions(this.messagingTemplate); + + // subscribe + no user + this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, null, "/user/queue/foo")); + Mockito.verifyZeroInteractions(this.messagingTemplate); + + // subscribe + not a user destination + this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, "joe", "/queue/foo")); + Mockito.verifyZeroInteractions(this.messagingTemplate); + + // no match on message type + this.messageHandler.handleMessage(createMessage(SimpMessageType.CONNECT, "joe", "user/joe/queue/foo")); + Mockito.verifyZeroInteractions(this.messagingTemplate); + } + + + private Message createMessage(SimpMessageType messageType, String user, String destination) { + SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(messageType); + if (destination != null) { + headers.setDestination(destination); + } + if (user != null) { + headers.setUser(new TestPrincipal(user)); + } + return MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java index 74c5dd8976..bcc5ad7e05 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java @@ -28,9 +28,9 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.TestPrincipal; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.support.TestPrincipal; import org.springframework.web.socket.support.TestWebSocketSession; import static org.junit.Assert.*; @@ -90,7 +90,6 @@ public class StompProtocolHandlerTests { assertEquals("1.1", replyHeaders.getVersion()); assertArrayEquals(new long[] {0, 0}, replyHeaders.getHeartbeat()); assertEquals("joe", replyHeaders.getNativeHeader("user-name").get(0)); - assertEquals("s1", replyHeaders.getNativeHeader("queue-suffix").get(0)); } @Test diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java b/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java index 866bdba2ba..f6a755ebd9 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java @@ -20,7 +20,8 @@ import java.security.Principal; /** - * An implementation of Prinicipal for testing. + * An implementation of {@link Principal} for testing. + * * @author Rossen Stoyanchev */ public class TestPrincipal implements Principal {