diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java index 51c5bfe681..96f4ba183e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java @@ -39,6 +39,8 @@ public class MessageBrokerConfigurer { private String[] annotationMethodDestinationPrefixes; + private String userDestinationPrefix; + public MessageBrokerConfigurer(MessageChannel webSocketResponseChannel) { Assert.notNull(webSocketResponseChannel); @@ -81,6 +83,26 @@ public class MessageBrokerConfigurer { return this; } + /** + * Configure the prefix used to identify user destinations. User destinations + * provide the ability for a user to subscribe to queue names unique to their + * session as well as for others to send messages to those unique, + * user-specific queues. + *
+ * For example when a user attempts to subscribe to "/user/queue/position-updates", + * the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a + * unique queue name that does not collide with any other user attempting to do the same. + * Subsequently when messages are sent to "/user/{username}/queue/position-updates", + * the destination is translated to "/queue/position-updatesi9oqdfzo". + *
+ * The default prefix used to identify such destinations is "/user/". + */ + public MessageBrokerConfigurer setUserDestinationPrefix(String destinationPrefix) { + this.userDestinationPrefix = destinationPrefix; + return this; + } + + protected AbstractBrokerMessageHandler getSimpleBroker() { initSimpleBrokerIfNecessary(); return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null; @@ -100,4 +122,8 @@ public class MessageBrokerConfigurer { return (this.annotationMethodDestinationPrefixes != null) ? Arrays.asList(this.annotationMethodDestinationPrefixes) : null; } + + protected String getUserDestinationPrefix() { + return this.userDestinationPrefix; + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java index 3e1050c920..0dec614f1d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; -import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.UserSessionRegistry; import org.springframework.messaging.simp.stomp.StompProtocolHandler; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; @@ -54,15 +54,15 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry { public ServletStompEndpointRegistry(WebSocketHandler webSocketHandler, - MutableUserQueueSuffixResolver userQueueSuffixResolver, TaskScheduler defaultSockJsTaskScheduler) { + UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) { Assert.notNull(webSocketHandler); - Assert.notNull(userQueueSuffixResolver); + Assert.notNull(userSessionRegistry); this.webSocketHandler = webSocketHandler; this.subProtocolWebSocketHandler = findSubProtocolWebSocketHandler(webSocketHandler); this.stompHandler = new StompProtocolHandler(); - this.stompHandler.setUserQueueSuffixResolver(userQueueSuffixResolver); + this.stompHandler.setUserSessionRegistry(userSessionRegistry); this.sockJsScheduler = defaultSockJsTaskScheduler; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java index 9ebb9d842c..30e376fda3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java @@ -68,8 +68,9 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { @Bean public HandlerMapping brokerWebSocketHandlerMapping() { - ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry(subProtocolWebSocketHandler(), - userQueueSuffixResolver(), brokerDefaultSockJsTaskScheduler()); + + ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry( + subProtocolWebSocketHandler(), userSessionRegistry(), brokerDefaultSockJsTaskScheduler()); registerStompEndpoints(registry); AbstractHandlerMapping hm = registry.getHandlerMapping(); @@ -85,13 +86,13 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { } @Bean - public MutableUserQueueSuffixResolver userQueueSuffixResolver() { - return new SimpleUserQueueSuffixResolver(); + public UserSessionRegistry userSessionRegistry() { + return new DefaultUserSessionRegistry(); } /** * The default TaskScheduler to use if none is configured via - * {@link SockJsServiceRegistration#setTaskScheduler()}, i.e. + * {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e. *
* @Configuration
* @EnableWebSocketMessageBroker
@@ -137,8 +138,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public SimpAnnotationMethodMessageHandler annotationMethodMessageHandler() {
+
SimpAnnotationMethodMessageHandler handler =
new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketResponseChannel());
+
handler.setDestinationPrefixes(getMessageBrokerConfigurer().getAnnotationMethodDestinationPrefixes());
handler.setMessageConverter(simpMessageConverter());
webSocketRequestChannel().subscribe(handler);
@@ -185,8 +188,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
+
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
- brokerMessagingTemplate(), userQueueSuffixResolver());
+ brokerMessagingTemplate(), userDestinationResolver());
+
webSocketRequestChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
@@ -195,6 +200,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public SimpMessageSendingOperations brokerMessagingTemplate() {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
+ String userDestinationPrefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
+ if (userDestinationPrefix != null) {
+ template.setUserDestinationPrefix(userDestinationPrefix);
+ }
template.setMessageConverter(simpMessageConverter());
return template;
}
@@ -220,6 +229,16 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
return new CompositeMessageConverter(converters, contentTypeResolver);
}
+ @Bean
+ public UserDestinationResolver userDestinationResolver() {
+ DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userSessionRegistry());
+ String prefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
+ if (prefix != null) {
+ resolver.setUserDestinationPrefix(prefix);
+ }
+ return resolver;
+ }
+
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) {
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java
index a3cad786d8..0da9e7a521 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractSubscriptionRegistry.java
@@ -108,7 +108,11 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
if (logger.isTraceEnabled()) {
logger.trace("Find subscriptions, destination=" + headers.getDestination());
}
- return findSubscriptionsInternal(destination, message);
+ MultiValueMap result = findSubscriptionsInternal(destination, message);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Found " + result.size() + " subscriptions");
+ }
+ return result;
}
protected abstract MultiValueMap findSubscriptionsInternal(
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java
new file mode 100644
index 0000000000..d5a3e88b2f
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolver.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.simp.handler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.SimpMessageType;
+import org.springframework.util.Assert;
+
+import java.security.Principal;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A default implementation of {@link @UserDestinationResolver}.
+ *
+ * Resolves messages sent to destination patterns "/user/{user-name}/**" as well as
+ * subscriptions to destinations "/user/queue/**" where the "/user/" prefix used to
+ * recognize such destinations is customizable via
+ * {@link #setUserDestinationPrefix(String)}.
+ *
+ * @author Rossen Stoyanchev
+ * @since 4.0
+ */
+public class DefaultUserDestinationResolver implements UserDestinationResolver {
+
+ private static final Log logger = LogFactory.getLog(DefaultUserDestinationResolver.class);
+
+
+ private final UserSessionRegistry userSessionRegistry;
+
+ private String destinationPrefix = "/user/";
+
+ private String subscriptionDestinationPrefix = "/user/queue/";
+
+
+ /**
+ * Create an instance that will access user session id information through
+ * the provided registry.
+ *
+ * @param userSessionRegistry the registry, never {@code null}
+ */
+ public DefaultUserDestinationResolver(UserSessionRegistry userSessionRegistry) {
+ Assert.notNull(userSessionRegistry, "'userSessionRegistry' is required");
+ this.userSessionRegistry = userSessionRegistry;
+ }
+
+ /**
+ * The prefix used to identify user destinations. Any destinations that do not
+ * start with the given prefix are not be resolved.
+ *
+ * The default value is "/user/".
+ *
+ * @param prefix the prefix to use
+ */
+ public void setUserDestinationPrefix(String prefix) {
+ Assert.hasText(prefix, "prefix is required");
+ this.destinationPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
+ this.subscriptionDestinationPrefix = this.destinationPrefix + "queue/";
+ }
+
+ /**
+ * Return the prefix used to identify user destinations. Any destinations that do not
+ * start with the given prefix are not be resolved.
+ *
+ * By default "/user/queue/".
+ */
+ public String getDestinationPrefix() {
+ return this.destinationPrefix;
+ }
+
+ /**
+ * Return the prefix used to identify user destinations for (un)subscribe messages.
+ *
+ * By default "/user/queue/".
+ */
+ public String getSubscriptionDestinationPrefix() {
+ return this.subscriptionDestinationPrefix;
+ }
+
+ @Override
+ public Set resolveDestination(Message> message) {
+
+ SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
+ UserDestinationInfo info = getUserDestinationInfo(headers);
+ if (info == null) {
+ return Collections.emptySet();
+ }
+
+ Set set = new HashSet();
+ for (String sessionId : this.userSessionRegistry.getSessionIds(info.getUser())) {
+ set.add(getTargetDestination(headers.getDestination(), info.getDestination(), sessionId, info.getUser()));
+ }
+ return set;
+ }
+
+ protected String getTargetDestination(String originalDestination, String targetDestination,
+ String sessionId, String user) {
+
+ return targetDestination + "-user" + sessionId;
+ }
+
+ private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor headers) {
+
+ String destination = headers.getDestination();
+ if (destination == null) {
+ logger.trace("Ignoring message, no destination");
+ return null;
+ }
+
+ String targetUser;
+ String targetDestination;
+
+ Principal user = headers.getUser();
+ SimpMessageType messageType = headers.getMessageType();
+
+ if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
+ if (user == null) {
+ logger.trace("Ignoring (un)subscribe message, no user information");
+ return null;
+ }
+ if (!destination.startsWith(this.subscriptionDestinationPrefix)) {
+ logger.trace("Ignoring (un)subscribe message, not a \"user\" destination");
+ return null;
+ }
+ targetUser = user.getName();
+ targetDestination = destination.substring(this.destinationPrefix.length()-1);
+ }
+ else if (SimpMessageType.MESSAGE.equals(messageType)) {
+ if (!destination.startsWith(this.destinationPrefix)) {
+ logger.trace("Ignoring message, not a \"user\" destination");
+ return null;
+ }
+ int startIndex = this.destinationPrefix.length();
+ int endIndex = destination.indexOf('/', startIndex);
+ Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\"");
+ targetUser = destination.substring(startIndex, endIndex);
+ targetDestination = destination.substring(endIndex);
+
+ }
+ else {
+ logger.trace("Ignoring message, not of the right message type");
+ return null;
+ }
+
+ return new UserDestinationInfo(targetUser, targetDestination);
+ }
+
+
+ private static class UserDestinationInfo {
+
+ private final String user;
+
+ private final String destination;
+
+ private UserDestinationInfo(String user, String destination) {
+ this.user = user;
+ this.destination = destination;
+ }
+
+ private String getUser() {
+ return this.user;
+ }
+
+ private String getDestination() {
+ return this.destination;
+ }
+ }
+
+}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistry.java
new file mode 100644
index 0000000000..e8dc33b8a7
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistry.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.simp.handler;
+
+import org.springframework.util.Assert;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * A default thread-safe implementation of {@link @UserSessionRegistry}.
+ *
+ * @author Rossen Stoyanchev
+ * @since 4.0
+ */
+public class DefaultUserSessionRegistry implements UserSessionRegistry {
+
+ // userId -> sessionId
+ private final ConcurrentMap> userSessionIds = new ConcurrentHashMap>();
+
+ private final Object lock = new Object();
+
+
+ @Override
+ public Set getSessionIds(String user) {
+ Set set = this.userSessionIds.get(user);
+ return (set != null) ? set : Collections.emptySet();
+ }
+
+ @Override
+ public void registerSessionId(String user, String sessionId) {
+
+ Assert.notNull(user, "'user' is required");
+ Assert.notNull(user, "'sessionId' is required");
+
+ synchronized (this.lock) {
+ Set set = this.userSessionIds.get(user);
+ if (set == null) {
+ set = new CopyOnWriteArraySet();
+ this.userSessionIds.put(user, set);
+ }
+ set.add(sessionId);
+ }
+ }
+
+ @Override
+ public void unregisterSessionId(String userName, String sessionId) {
+
+ Assert.notNull(userName, "'userName' is required");
+ Assert.notNull(userName, "'sessionId' is required");
+
+ synchronized (lock) {
+ Set set = this.userSessionIds.get(userName);
+ if (set != null) {
+ if (set.remove(sessionId) && set.isEmpty()) {
+ this.userSessionIds.remove(userName);
+ }
+ }
+ }
+ }
+
+}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java
deleted file mode 100644
index 6b3d018b59..0000000000
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2002-2013 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.messaging.simp.handler;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-
-/**
- * @author Rossen Stoyanchev
- * @since 4.0
- */
-public class SimpleUserQueueSuffixResolver implements MutableUserQueueSuffixResolver {
-
- // userId -> [sessionId -> queueSuffix]
- private final ConcurrentMap> cache = new ConcurrentHashMap>();
-
-
- @Override
- public void addQueueSuffix(String user, String sessionId, String suffix) {
- Map suffixes = this.cache.get(user);
- if (suffixes == null) {
- suffixes = new ConcurrentHashMap();
- Map prevSuffixes = this.cache.putIfAbsent(user, suffixes);
- if (prevSuffixes != null) {
- suffixes = prevSuffixes;
- }
- }
- suffixes.put(sessionId, suffix);
- }
-
- @Override
- public void removeQueueSuffix(String user, String sessionId) {
- Map suffixes = this.cache.get(user);
- if (suffixes != null) {
- if (suffixes.remove(sessionId) != null) {
- this.cache.remove(user, Collections.emptyMap());
- }
- }
- }
-
- @Override
- public Set getUserQueueSuffixes(String user) {
- Map suffixes = this.cache.get(user);
- return (suffixes != null) ? new HashSet(suffixes.values()) : Collections.emptySet();
- }
-
- @Override
- public String getUserQueueSuffix(String user, String sessionId) {
- Map suffixes = this.cache.get(user);
- if (suffixes != null) {
- return suffixes.get(sessionId);
- }
- return null;
- }
-
-}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java
index a645a270ec..d1da249399 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java
@@ -23,20 +23,19 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
-import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import java.util.Set;
/**
- * Supports destinations prefixed with "/user/{username}", transforms the
- * destination to a unique queue to which the user is subscribed, and then sends
- * the message for further processing.
- *
- * The target destination has the prefix removed and a unique queue suffix,
- * resolved via {@link #setUserQueueSuffixResolver(UserQueueSuffixResolver)}, appended.
- * For example a destination such as "/user/john/queue/trade-confirmation" could
- * be transformed to "/queue/trade-confirmation/i9oqdfzo".
+ * Provides support for messages sent to "user" destinations, translating the
+ * destination to one or more user-specific destination(s) and then sending message(s)
+ * with the updated target destination using the provided messaging template.
+ *
+ * See {@link UserDestinationResolver} for more details and examples.
*
* @author Rossen Stoyanchev
* @since 4.0
@@ -45,140 +44,67 @@ public class UserDestinationMessageHandler implements MessageHandler {
private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class);
+
private final MessageSendingOperations messagingTemplate;
- private String destinationPrefix = "/user/";
-
- private UserQueueSuffixResolver userQueueSuffixResolver = new SimpleUserQueueSuffixResolver();
+ private final UserDestinationResolver userDestinationResolver;
/**
+ * Create an instance of the handler with the given messaging template and a
+ * user destination resolver.
*
- * @param messagingTemplate
- * @param resolver the resolver to use to find queue suffixes for a user
+ * @param messagingTemplate a messaging template to use for sending messages
+ * with translated user destinations
+ * @param userDestinationResolver the resolver to use to find queue suffixes for a user
*/
public UserDestinationMessageHandler(MessageSendingOperations messagingTemplate,
- UserQueueSuffixResolver userQueueSuffixResolver) {
+ UserDestinationResolver userDestinationResolver) {
Assert.notNull(messagingTemplate, "messagingTemplate is required");
- Assert.notNull(userQueueSuffixResolver, "userQueueSuffixResolver is required");
+ Assert.notNull(userDestinationResolver, "destinationResolver is required");
this.messagingTemplate = messagingTemplate;
- this.userQueueSuffixResolver = userQueueSuffixResolver;
+ this.userDestinationResolver = userDestinationResolver;
}
/**
- * The default prefix is "/user".
- * @param prefix the prefix to set
+ * Return the configured {@link UserDestinationResolver}.
*/
- public void setDestinationPrefix(String prefix) {
- Assert.hasText(prefix, "prefix is required");
- this.destinationPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
+ public UserDestinationResolver getUserDestinationResolver() {
+ return this.userDestinationResolver;
}
/**
- * @return the prefix
- */
- public String getDestinationPrefix() {
- return this.destinationPrefix;
- }
-
- /**
- * @return the resolver for queue suffixes for a user
- */
- public UserQueueSuffixResolver getUserQueueSuffixResolver() {
- return this.userQueueSuffixResolver;
- }
-
- /**
- * @return the messagingTemplate
+ * Return the configured messaging template for sending messages with
+ * translated destinations.
*/
public MessageSendingOperations getMessagingTemplate() {
return this.messagingTemplate;
}
+
@Override
public void handleMessage(Message> message) throws MessagingException {
- SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
- SimpMessageType messageType = headers.getMessageType();
- String destination = headers.getDestination();
-
- if (!SimpMessageType.MESSAGE.equals(messageType)) {
- return;
- }
-
- if (!checkDestination(destination)) {
- return;
- }
-
if (logger.isTraceEnabled()) {
- logger.trace("Processing message to destination " + destination);
+ logger.trace("Handling message " + message);
}
- UserDestinationParser destinationParser = new UserDestinationParser(destination);
- String user = destinationParser.getUser();
-
- if (user == null) {
- if (logger.isErrorEnabled()) {
- logger.error("Ignoring message, expected destination pattern \"" + this.destinationPrefix
- + "{userId}/**\": " + destination);
- }
+ Set destinations = this.userDestinationResolver.resolveDestination(message);
+ if (CollectionUtils.isEmpty(destinations)) {
return;
}
- for (String sessionId : this.userQueueSuffixResolver.getUserQueueSuffixes(user)) {
-
- String targetDestination = destinationParser.getTargetDestination(sessionId);
+ for (String targetDestination : destinations) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending message to resolved user destination: " + targetDestination);
+ }
+ SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
headers.setDestination(targetDestination);
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
-
- if (logger.isTraceEnabled()) {
- logger.trace("Sending message to resolved target destination " + targetDestination);
- }
this.messagingTemplate.send(targetDestination, message);
}
}
- private boolean checkDestination(String destination) {
- if (destination != null) {
- if (destination.startsWith(this.destinationPrefix)) {
- return true;
- }
- }
- return false;
- }
-
-
- private class UserDestinationParser {
-
- private final String user;
-
- private final String targetDestination;
-
-
- public UserDestinationParser(String destination) {
-
- int userStartIndex = destinationPrefix.length();
- int userEndIndex = destination.indexOf('/', userStartIndex);
-
- if (userEndIndex > 0) {
- this.user = destination.substring(userStartIndex, userEndIndex);
- this.targetDestination = destination.substring(userEndIndex);
- }
- else {
- this.user = null;
- this.targetDestination = null;
- }
- }
-
- public String getUser() {
- return this.user;
- }
-
- public String getTargetDestination(String sessionId) {
- return (this.targetDestination != null) ? this.targetDestination + sessionId : null;
- }
- }
-
}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationResolver.java
new file mode 100644
index 0000000000..420c7ec6d7
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationResolver.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.simp.handler;
+
+import org.springframework.messaging.Message;
+
+import java.util.Set;
+
+
+/**
+ * A strategy for resolving unique, user destinations per session. User destinations
+ * provide a user with the ability to subscribe to a queue unique to their session
+ * as well others with the ability to send messages to those queues.
+ *
+ * For example when a user attempts to subscribe to "/user/queue/position-updates",
+ * the destination may be resolved to "/queue/position-updates-useri9oqdfzo" yielding a
+ * unique queue name that does not collide with any other user attempting to do the same.
+ * Subsequently when messages are sent to "/user/{username}/queue/position-updates",
+ * the destination is translated to "/queue/position-updates-useri9oqdfzo".
+ *
+ * @author Rossen Stoyanchev
+ * @since 4.0
+ *
+ * @see UserDestinationMessageHandler
+ */
+public interface UserDestinationResolver {
+
+ /**
+ * Resolve the destination of the message to one or more user/session-specific target
+ * destinations. If the user has multiple sessions, the method may return more than
+ * one target destinations.
+ *
+ * @param message the message to resolve
+ *
+ * @return the resolved unique user destinations or an empty Set if the message
+ * destination is not recognized as a user destination
+ */
+ Set resolveDestination(Message> message);
+
+}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java
deleted file mode 100644
index 0acf82c253..0000000000
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2002-2013 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.messaging.simp.handler;
-
-import java.util.Set;
-
-
-/**
- * A strategy for resolving unique queue suffixes for a connected user.
- * There can be only one suffix per user per session.
- *
- * @author Rossen Stoyanchev
- * @since 4.0
- */
-public interface UserQueueSuffixResolver {
-
- /**
- * Retrieve the suffixes for all sessions associated with this user.
- *
- * @param user the user name
- * @return a Set with zero, one, or more, queue suffixes
- */
- Set getUserQueueSuffixes(String user);
-
- /**
- * Retrieve the queue suffix associated with the given user session.
- *
- * @param user the user name
- * @param sessionId the session id
- * @return a queue suffix or {@code null}
- */
- String getUserQueueSuffix(String user, String sessionId);
-
-}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionRegistry.java
new file mode 100644
index 0000000000..42e9272b0f
--- /dev/null
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionRegistry.java
@@ -0,0 +1,44 @@
+package org.springframework.messaging.simp.handler;
+
+import java.util.Set;
+
+/**
+ * A registry for looking up active session id's by user.
+ *
+ * Used in support of resolving unique session-specific user destinations.
+ * See {@link DefaultUserDestinationResolver} for more details.
+ *
+ * @author Rossen Stoyanchev
+ * @since 4.0
+ *
+ * @see DefaultUserDestinationResolver
+ */
+public interface UserSessionRegistry {
+
+
+ /**
+ * Return the active session id's for the given user.
+ *
+ * @param user the user
+ * @return a set with 0 or more session id's
+ */
+ Set getSessionIds(String user);
+
+ /**
+ * Register an active session id for the given user.
+ *
+ * @param user the user
+ * @param sessionId the session id
+ */
+ void registerSessionId(String user, String sessionId);
+
+ /**
+ * Unregister the session id for a user.
+ *
+ * @param user the user
+ * @param sessionId the session id
+ */
+ void unregisterSessionId(String user, String sessionId);
+
+
+}
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java
index 2a4b17cdbb..17e4d51ebe 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java
@@ -30,8 +30,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.websocket.SubProtocolHandler;
import org.springframework.messaging.simp.SimpMessageType;
-import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
-import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver;
+import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
@@ -45,6 +44,7 @@ import org.springframework.web.socket.WebSocketSession;
*
* @author Rossen Stoyanchev
* @author Andy Wilkinson
+ * @since 4.0
*/
public class StompProtocolHandler implements SubProtocolHandler {
@@ -54,36 +54,30 @@ public class StompProtocolHandler implements SubProtocolHandler {
*/
public static final String CONNECTED_USER_HEADER = "user-name";
- /**
- * A suffix unique to the current session that a client can use to append to
- * a destination to make it unique.
- *
- * @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler}
- */
- public static final String QUEUE_SUFFIX_HEADER = "queue-suffix";
+ private static final Log logger = LogFactory.getLog(StompProtocolHandler.class);
- private final Log logger = LogFactory.getLog(StompProtocolHandler.class);
private final StompDecoder stompDecoder = new StompDecoder();
private final StompEncoder stompEncoder = new StompEncoder();
- private MutableUserQueueSuffixResolver queueSuffixResolver = new SimpleUserQueueSuffixResolver();
+ private UserSessionRegistry userSessionRegistry;
/**
- * Configure a resolver to use to maintain queue suffixes for user
+ * Provide a registry with which to register active user session ids.
+ *
* @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler}
*/
- public void setUserQueueSuffixResolver(MutableUserQueueSuffixResolver resolver) {
- this.queueSuffixResolver = resolver;
+ public void setUserSessionRegistry(UserSessionRegistry registry) {
+ this.userSessionRegistry = registry;
}
/**
- * @return the resolver for queue suffixes for a user
+ * @return the configured UserSessionRegistry.
*/
- public MutableUserQueueSuffixResolver getUserQueueSuffixResolver() {
- return this.queueSuffixResolver;
+ public UserSessionRegistry getUserSessionRegistry() {
+ return this.userSessionRegistry;
}
@Override
@@ -162,7 +156,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
}
if (headers.getCommand() == StompCommand.CONNECTED) {
- augmentConnectedHeaders(headers, session);
+ afterStompSessionConnected(headers, session);
}
if (StompCommand.MESSAGE.equals(headers.getCommand()) && (headers.getSubscriptionId() == null)) {
@@ -189,6 +183,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
session.close(CloseStatus.PROTOCOL_ERROR);
}
catch (IOException e) {
+ // Ignore
}
}
}
@@ -216,15 +211,13 @@ public class StompProtocolHandler implements SubProtocolHandler {
}
}
- private void augmentConnectedHeaders(StompHeaderAccessor headers, WebSocketSession session) {
+ private void afterStompSessionConnected(StompHeaderAccessor headers, WebSocketSession session) {
Principal principal = session.getPrincipal();
if (principal != null) {
headers.setNativeHeader(CONNECTED_USER_HEADER, principal.getName());
- headers.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId());
-
- if (this.queueSuffixResolver != null) {
+ if (this.userSessionRegistry != null) {
String suffix = session.getId();
- this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix);
+ this.userSessionRegistry.registerSessionId(principal.getName(), session.getId());
}
}
}
@@ -242,8 +235,8 @@ public class StompProtocolHandler implements SubProtocolHandler {
@Override
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) {
- if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) {
- this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), session.getId());
+ if ((this.userSessionRegistry != null) && (session.getPrincipal() != null)) {
+ this.userSessionRegistry.unregisterSessionId(session.getPrincipal().getName(), session.getId());
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/TestPrincipal.java
similarity index 51%
rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java
rename to spring-messaging/src/test/java/org/springframework/messaging/simp/TestPrincipal.java
index 1fe326958d..ec03cc7dcd 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/TestPrincipal.java
@@ -14,17 +14,44 @@
* limitations under the License.
*/
-package org.springframework.messaging.simp.handler;
+package org.springframework.messaging.simp;
+
+import java.security.Principal;
/**
+ * An implementation of {@link Principal} for testing.
+ *
* @author Rossen Stoyanchev
- * @since 4.0
*/
-public interface MutableUserQueueSuffixResolver extends UserQueueSuffixResolver {
+public class TestPrincipal implements Principal {
- void addQueueSuffix(String user, String sessionId, String suffix);
+ private String name;
- void removeQueueSuffix(String user, String sessionId);
+ public TestPrincipal(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof TestPrincipal)) {
+ return false;
+ }
+ TestPrincipal p = (TestPrincipal) obj;
+ return this.name.equals(p.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.name.hashCode();
+ }
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java
index 7531e8ed20..480e9f34c7 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java
@@ -24,8 +24,8 @@ import org.mockito.Mockito;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.websocket.SubProtocolHandler;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
-import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
-import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver;
+import org.springframework.messaging.simp.handler.DefaultUserSessionRegistry;
+import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.simp.stomp.StompProtocolHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
@@ -44,16 +44,16 @@ public class ServletStompEndpointRegistryTests {
private SubProtocolWebSocketHandler webSocketHandler;
- private MutableUserQueueSuffixResolver queueSuffixResolver;
+ private UserSessionRegistry userSessionRegistry;
@Before
public void setup() {
MessageChannel channel = Mockito.mock(MessageChannel.class);
this.webSocketHandler = new SubProtocolWebSocketHandler(channel);
- this.queueSuffixResolver = new SimpleUserQueueSuffixResolver();
+ this.userSessionRegistry = new DefaultUserSessionRegistry();
TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class);
- this.registry = new ServletStompEndpointRegistry(webSocketHandler, queueSuffixResolver, taskScheduler);
+ this.registry = new ServletStompEndpointRegistry(webSocketHandler, userSessionRegistry, taskScheduler);
}
@@ -69,7 +69,7 @@ public class ServletStompEndpointRegistryTests {
assertNotNull(protocolHandlers.get("v12.stomp"));
StompProtocolHandler stompHandler = (StompProtocolHandler) protocolHandlers.get("v10.stomp");
- assertSame(this.queueSuffixResolver, stompHandler.getUserQueueSuffixResolver());
+ assertSame(this.userSessionRegistry, stompHandler.getUserSessionRegistry());
}
@Test
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java
index fdf6ad7499..bf01420b39 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java
@@ -35,9 +35,9 @@ import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandl
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.annotation.SubscribeEvent;
import org.springframework.messaging.simp.handler.SimpAnnotationMethodMessageHandler;
-import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.handler.UserDestinationMessageHandler;
+import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
@@ -259,7 +259,7 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
SubscribableChannel channel = this.cxtSimpleBroker.getBean("brokerChannel", SubscribableChannel.class);
UserDestinationMessageHandler messageHandler = this.cxtSimpleBroker.getBean(UserDestinationMessageHandler.class);
- this.cxtSimpleBroker.getBean(MutableUserQueueSuffixResolver.class).addQueueSuffix("joe", "s1", "s1");
+ this.cxtSimpleBroker.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1");
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setDestination("/user/joe/foo");
@@ -274,7 +274,7 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
- assertEquals("/foos1", headers.getDestination());
+ assertEquals("/foo-users1", headers.getDestination());
}
@Test
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolverTests.java
new file mode 100644
index 0000000000..4a62301f12
--- /dev/null
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserDestinationResolverTests.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.simp.handler;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.SimpMessageType;
+import org.springframework.messaging.simp.TestPrincipal;
+import org.springframework.messaging.support.MessageBuilder;
+
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link DefaultUserDestinationResolver}.
+ */
+public class DefaultUserDestinationResolverTests {
+
+ private DefaultUserDestinationResolver resolver;
+
+ private UserSessionRegistry registry;
+
+
+ @Before
+ public void setup() {
+ this.registry = new DefaultUserSessionRegistry();
+ this.resolver = new DefaultUserDestinationResolver(this.registry);
+ }
+
+
+ @Test
+ public void handleSubscribe() {
+ Message> message = createMessage(SimpMessageType.SUBSCRIBE, "joe", "/user/queue/foo");
+ this.registry.registerSessionId("joe", "123");
+ Set actual = this.resolver.resolveDestination(message);
+
+ assertEquals(1, actual.size());
+ assertEquals("/queue/foo-user123", actual.iterator().next());
+ }
+
+ @Test
+ public void handleUnsubscribe() {
+ Message> message = createMessage(SimpMessageType.UNSUBSCRIBE, "joe", "/user/queue/foo");
+ this.registry.registerSessionId("joe", "123");
+ Set actual = this.resolver.resolveDestination(message);
+
+ assertEquals(1, actual.size());
+ assertEquals("/queue/foo-user123", actual.iterator().next());
+ }
+
+ @Test
+ public void handleMessage() {
+ Message> message = createMessage(SimpMessageType.MESSAGE, "joe", "/user/joe/queue/foo");
+ this.registry.registerSessionId("joe", "123");
+ Set actual = this.resolver.resolveDestination(message);
+
+ assertEquals(1, actual.size());
+ assertEquals("/queue/foo-user123", actual.iterator().next());
+ }
+
+
+ @Test
+ public void ignoreMessage() {
+
+ // no destination
+ Message> message = createMessage(SimpMessageType.MESSAGE, "joe", null);
+ Set actual = this.resolver.resolveDestination(message);
+ assertEquals(0, actual.size());
+
+ // not a user destination
+ message = createMessage(SimpMessageType.MESSAGE, "joe", "/queue/foo");
+ actual = this.resolver.resolveDestination(message);
+ assertEquals(0, actual.size());
+
+ // subscribe + no user
+ message = createMessage(SimpMessageType.SUBSCRIBE, null, "/user/queue/foo");
+ actual = this.resolver.resolveDestination(message);
+ assertEquals(0, actual.size());
+
+ // subscribe + not a user destination
+ message = createMessage(SimpMessageType.SUBSCRIBE, "joe", "/queue/foo");
+ actual = this.resolver.resolveDestination(message);
+ assertEquals(0, actual.size());
+
+ // no match on message type
+ message = createMessage(SimpMessageType.CONNECT, "joe", "user/joe/queue/foo");
+ actual = this.resolver.resolveDestination(message);
+ assertEquals(0, actual.size());
+ }
+
+
+ private Message> createMessage(SimpMessageType messageType, String user, String destination) {
+ SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(messageType);
+ if (destination != null) {
+ headers.setDestination(destination);
+ }
+ if (user != null) {
+ headers.setUser(new TestPrincipal(user));
+ }
+ return MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
+ }
+
+}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistryTests.java
similarity index 54%
rename from spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java
rename to spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistryTests.java
index 8a2386640c..e03a8dfdf1 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/DefaultUserSessionRegistryTests.java
@@ -27,56 +27,57 @@ import static org.junit.Assert.*;
/**
- * Test fixture for {@link SimpleUserQueueSuffixResolver}
+ * Test fixture for {@link DefaultUserSessionRegistry}
*
* @author Rossen Stoyanchev
* @since 4.0
*/
-public class SimpleUserQueueSuffixResolverTests {
+public class DefaultUserSessionRegistryTests {
private static final String user = "joe";
+
private static final List sessionIds = Arrays.asList("sess01", "sess02", "sess03");
@Test
public void addOneSessionId() {
- SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver();
- resolver.addQueueSuffix(user, sessionIds.get(0), sessionIds.get(0));
+ DefaultUserSessionRegistry resolver = new DefaultUserSessionRegistry();
+ resolver.registerSessionId(user, sessionIds.get(0));
- assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user));
- assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes("jane"));
+ assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getSessionIds(user));
+ assertSame(Collections.emptySet(), resolver.getSessionIds("jane"));
}
@Test
public void addMultipleSessionIds() {
- SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver();
+ DefaultUserSessionRegistry resolver = new DefaultUserSessionRegistry();
for (String sessionId : sessionIds) {
- resolver.addQueueSuffix(user, sessionId, sessionId);
+ resolver.registerSessionId(user, sessionId);
}
- assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user));
- assertEquals(Collections.emptySet(), resolver.getUserQueueSuffixes("jane"));
+ assertEquals(new LinkedHashSet<>(sessionIds), resolver.getSessionIds(user));
+ assertEquals(Collections.emptySet(), resolver.getSessionIds("jane"));
}
@Test
public void removeSessionIds() {
- SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver();
+ DefaultUserSessionRegistry resolver = new DefaultUserSessionRegistry();
for (String sessionId : sessionIds) {
- resolver.addQueueSuffix(user, sessionId, sessionId);
+ resolver.registerSessionId(user, sessionId);
}
- assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user));
+ assertEquals(new LinkedHashSet<>(sessionIds), resolver.getSessionIds(user));
- resolver.removeQueueSuffix(user, sessionIds.get(1));
- resolver.removeQueueSuffix(user, sessionIds.get(2));
- assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user));
+ resolver.unregisterSessionId(user, sessionIds.get(1));
+ resolver.unregisterSessionId(user, sessionIds.get(2));
+ assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getSessionIds(user));
- resolver.removeQueueSuffix(user, sessionIds.get(0));
- assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes(user));
+ resolver.unregisterSessionId(user, sessionIds.get(0));
+ assertSame(Collections.emptySet(), resolver.getSessionIds(user));
}
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java
new file mode 100644
index 0000000000..c5324e2c36
--- /dev/null
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandlerTests.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2002-2013 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.simp.handler;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.core.MessageSendingOperations;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.SimpMessageType;
+import org.springframework.messaging.simp.TestPrincipal;
+import org.springframework.messaging.support.MessageBuilder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link UserDestinationMessageHandler}.
+ */
+public class UserDestinationMessageHandlerTests {
+
+ private UserDestinationMessageHandler messageHandler;
+
+ private MessageSendingOperations messagingTemplate;
+
+ private UserSessionRegistry registry;
+
+
+ @Before
+ public void setup() {
+ this.messagingTemplate = Mockito.mock(MessageSendingOperations.class);
+ this.registry = new DefaultUserSessionRegistry();
+ DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(this.registry);
+ this.messageHandler = new UserDestinationMessageHandler(this.messagingTemplate, resolver);
+ }
+
+
+ @Test
+ public void handleSubscribe() {
+ this.registry.registerSessionId("joe", "123");
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, "joe", "/user/queue/foo"));
+
+ ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor captor2 = ArgumentCaptor.forClass(Message.class);
+ Mockito.verify(this.messagingTemplate).send(captor1.capture(), captor2.capture());
+
+ assertEquals("/queue/foo-user123", captor1.getValue());
+ }
+
+ @Test
+ public void handleUnsubscribe() {
+ this.registry.registerSessionId("joe", "123");
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.UNSUBSCRIBE, "joe", "/user/queue/foo"));
+
+ ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor captor2 = ArgumentCaptor.forClass(Message.class);
+ Mockito.verify(this.messagingTemplate).send(captor1.capture(), captor2.capture());
+
+ assertEquals("/queue/foo-user123", captor1.getValue());
+ }
+
+ @Test
+ public void handleMessage() {
+ this.registry.registerSessionId("joe", "123");
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "/user/joe/queue/foo"));
+
+ ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor captor2 = ArgumentCaptor.forClass(Message.class);
+ Mockito.verify(this.messagingTemplate).send(captor1.capture(), captor2.capture());
+
+ assertEquals("/queue/foo-user123", captor1.getValue());
+ }
+
+
+ @Test
+ public void ignoreMessage() {
+
+ // no destination
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", null));
+ Mockito.verifyZeroInteractions(this.messagingTemplate);
+
+ // not a user destination
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.MESSAGE, "joe", "/queue/foo"));
+ Mockito.verifyZeroInteractions(this.messagingTemplate);
+
+ // subscribe + no user
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, null, "/user/queue/foo"));
+ Mockito.verifyZeroInteractions(this.messagingTemplate);
+
+ // subscribe + not a user destination
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.SUBSCRIBE, "joe", "/queue/foo"));
+ Mockito.verifyZeroInteractions(this.messagingTemplate);
+
+ // no match on message type
+ this.messageHandler.handleMessage(createMessage(SimpMessageType.CONNECT, "joe", "user/joe/queue/foo"));
+ Mockito.verifyZeroInteractions(this.messagingTemplate);
+ }
+
+
+ private Message> createMessage(SimpMessageType messageType, String user, String destination) {
+ SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(messageType);
+ if (destination != null) {
+ headers.setDestination(destination);
+ }
+ if (user != null) {
+ headers.setUser(new TestPrincipal(user));
+ }
+ return MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
+ }
+
+}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java
index 74c5dd8976..bcc5ad7e05 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java
@@ -28,9 +28,9 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
+import org.springframework.messaging.simp.TestPrincipal;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.support.TestPrincipal;
import org.springframework.web.socket.support.TestWebSocketSession;
import static org.junit.Assert.*;
@@ -90,7 +90,6 @@ public class StompProtocolHandlerTests {
assertEquals("1.1", replyHeaders.getVersion());
assertArrayEquals(new long[] {0, 0}, replyHeaders.getHeartbeat());
assertEquals("joe", replyHeaders.getNativeHeader("user-name").get(0));
- assertEquals("s1", replyHeaders.getNativeHeader("queue-suffix").get(0));
}
@Test
diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java b/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java
index 866bdba2ba..f6a755ebd9 100644
--- a/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java
+++ b/spring-websocket/src/test/java/org/springframework/web/socket/support/TestPrincipal.java
@@ -20,7 +20,8 @@ import java.security.Principal;
/**
- * An implementation of Prinicipal for testing.
+ * An implementation of {@link Principal} for testing.
+ *
* @author Rossen Stoyanchev
*/
public class TestPrincipal implements Principal {