Update user destinations handling

Before this change subscribing to a user-specific destination in STOMP
required manually appending a unique queue suffix provided in a header
with the CONNECTED frame.

This change removes the need to do that. Instead STOMP clients can
subscribe to "/user/queue/error" and can then begin to receive messages
sent to "/user/{username}/queue/error" without colliding with any other
user doing the same.

Issue: SPR-11077
This commit is contained in:
Rossen Stoyanchev
2013-11-07 17:33:13 -05:00
parent c09f85172b
commit 0340cc5f03
20 changed files with 783 additions and 300 deletions

View File

@@ -39,6 +39,8 @@ public class MessageBrokerConfigurer {
private String[] annotationMethodDestinationPrefixes;
private String userDestinationPrefix;
public MessageBrokerConfigurer(MessageChannel webSocketResponseChannel) {
Assert.notNull(webSocketResponseChannel);
@@ -81,6 +83,26 @@ public class MessageBrokerConfigurer {
return this;
}
/**
* Configure the prefix used to identify user destinations. User destinations
* provide the ability for a user to subscribe to queue names unique to their
* session as well as for others to send messages to those unique,
* user-specific queues.
* <p>
* For example when a user attempts to subscribe to "/user/queue/position-updates",
* the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a
* unique queue name that does not collide with any other user attempting to do the same.
* Subsequently when messages are sent to "/user/{username}/queue/position-updates",
* the destination is translated to "/queue/position-updatesi9oqdfzo".
* <p>
* The default prefix used to identify such destinations is "/user/".
*/
public MessageBrokerConfigurer setUserDestinationPrefix(String destinationPrefix) {
this.userDestinationPrefix = destinationPrefix;
return this;
}
protected AbstractBrokerMessageHandler getSimpleBroker() {
initSimpleBrokerIfNecessary();
return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null;
@@ -100,4 +122,8 @@ public class MessageBrokerConfigurer {
return (this.annotationMethodDestinationPrefixes != null)
? Arrays.asList(this.annotationMethodDestinationPrefixes) : null;
}
protected String getUserDestinationPrefix() {
return this.userDestinationPrefix;
}
}

View File

@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.simp.stomp.StompProtocolHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
@@ -54,15 +54,15 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry {
public ServletStompEndpointRegistry(WebSocketHandler webSocketHandler,
MutableUserQueueSuffixResolver userQueueSuffixResolver, TaskScheduler defaultSockJsTaskScheduler) {
UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) {
Assert.notNull(webSocketHandler);
Assert.notNull(userQueueSuffixResolver);
Assert.notNull(userSessionRegistry);
this.webSocketHandler = webSocketHandler;
this.subProtocolWebSocketHandler = findSubProtocolWebSocketHandler(webSocketHandler);
this.stompHandler = new StompProtocolHandler();
this.stompHandler.setUserQueueSuffixResolver(userQueueSuffixResolver);
this.stompHandler.setUserSessionRegistry(userSessionRegistry);
this.sockJsScheduler = defaultSockJsTaskScheduler;
}

View File

@@ -68,8 +68,9 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public HandlerMapping brokerWebSocketHandlerMapping() {
ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry(subProtocolWebSocketHandler(),
userQueueSuffixResolver(), brokerDefaultSockJsTaskScheduler());
ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry(
subProtocolWebSocketHandler(), userSessionRegistry(), brokerDefaultSockJsTaskScheduler());
registerStompEndpoints(registry);
AbstractHandlerMapping hm = registry.getHandlerMapping();
@@ -85,13 +86,13 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
}
@Bean
public MutableUserQueueSuffixResolver userQueueSuffixResolver() {
return new SimpleUserQueueSuffixResolver();
public UserSessionRegistry userSessionRegistry() {
return new DefaultUserSessionRegistry();
}
/**
* The default TaskScheduler to use if none is configured via
* {@link SockJsServiceRegistration#setTaskScheduler()}, i.e.
* {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e.
* <pre class="code">
* &#064;Configuration
* &#064;EnableWebSocketMessageBroker
@@ -137,8 +138,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public SimpAnnotationMethodMessageHandler annotationMethodMessageHandler() {
SimpAnnotationMethodMessageHandler handler =
new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketResponseChannel());
handler.setDestinationPrefixes(getMessageBrokerConfigurer().getAnnotationMethodDestinationPrefixes());
handler.setMessageConverter(simpMessageConverter());
webSocketRequestChannel().subscribe(handler);
@@ -185,8 +188,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
brokerMessagingTemplate(), userQueueSuffixResolver());
brokerMessagingTemplate(), userDestinationResolver());
webSocketRequestChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
@@ -195,6 +200,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public SimpMessageSendingOperations brokerMessagingTemplate() {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
String userDestinationPrefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
if (userDestinationPrefix != null) {
template.setUserDestinationPrefix(userDestinationPrefix);
}
template.setMessageConverter(simpMessageConverter());
return template;
}
@@ -220,6 +229,16 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
return new CompositeMessageConverter(converters, contentTypeResolver);
}
@Bean
public UserDestinationResolver userDestinationResolver() {
DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userSessionRegistry());
String prefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
if (prefix != null) {
resolver.setUserDestinationPrefix(prefix);
}
return resolver;
}
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) {

View File

@@ -108,7 +108,11 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
if (logger.isTraceEnabled()) {
logger.trace("Find subscriptions, destination=" + headers.getDestination());
}
return findSubscriptionsInternal(destination, message);
MultiValueMap<String, String> result = findSubscriptionsInternal(destination, message);
if (logger.isTraceEnabled()) {
logger.trace("Found " + result.size() + " subscriptions");
}
return result;
}
protected abstract MultiValueMap<String, String> findSubscriptionsInternal(

View File

@@ -0,0 +1,187 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.util.Assert;
import java.security.Principal;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* A default implementation of {@link @UserDestinationResolver}.
* <p>
* Resolves messages sent to destination patterns "/user/{user-name}/**" as well as
* subscriptions to destinations "/user/queue/**" where the "/user/" prefix used to
* recognize such destinations is customizable via
* {@link #setUserDestinationPrefix(String)}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class DefaultUserDestinationResolver implements UserDestinationResolver {
private static final Log logger = LogFactory.getLog(DefaultUserDestinationResolver.class);
private final UserSessionRegistry userSessionRegistry;
private String destinationPrefix = "/user/";
private String subscriptionDestinationPrefix = "/user/queue/";
/**
* Create an instance that will access user session id information through
* the provided registry.
*
* @param userSessionRegistry the registry, never {@code null}
*/
public DefaultUserDestinationResolver(UserSessionRegistry userSessionRegistry) {
Assert.notNull(userSessionRegistry, "'userSessionRegistry' is required");
this.userSessionRegistry = userSessionRegistry;
}
/**
* The prefix used to identify user destinations. Any destinations that do not
* start with the given prefix are not be resolved.
* <p>
* The default value is "/user/".
*
* @param prefix the prefix to use
*/
public void setUserDestinationPrefix(String prefix) {
Assert.hasText(prefix, "prefix is required");
this.destinationPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
this.subscriptionDestinationPrefix = this.destinationPrefix + "queue/";
}
/**
* Return the prefix used to identify user destinations. Any destinations that do not
* start with the given prefix are not be resolved.
* <p>
* By default "/user/queue/".
*/
public String getDestinationPrefix() {
return this.destinationPrefix;
}
/**
* Return the prefix used to identify user destinations for (un)subscribe messages.
* <p>
* By default "/user/queue/".
*/
public String getSubscriptionDestinationPrefix() {
return this.subscriptionDestinationPrefix;
}
@Override
public Set<String> resolveDestination(Message<?> message) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
UserDestinationInfo info = getUserDestinationInfo(headers);
if (info == null) {
return Collections.emptySet();
}
Set<String> set = new HashSet<String>();
for (String sessionId : this.userSessionRegistry.getSessionIds(info.getUser())) {
set.add(getTargetDestination(headers.getDestination(), info.getDestination(), sessionId, info.getUser()));
}
return set;
}
protected String getTargetDestination(String originalDestination, String targetDestination,
String sessionId, String user) {
return targetDestination + "-user" + sessionId;
}
private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor headers) {
String destination = headers.getDestination();
if (destination == null) {
logger.trace("Ignoring message, no destination");
return null;
}
String targetUser;
String targetDestination;
Principal user = headers.getUser();
SimpMessageType messageType = headers.getMessageType();
if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
if (user == null) {
logger.trace("Ignoring (un)subscribe message, no user information");
return null;
}
if (!destination.startsWith(this.subscriptionDestinationPrefix)) {
logger.trace("Ignoring (un)subscribe message, not a \"user\" destination");
return null;
}
targetUser = user.getName();
targetDestination = destination.substring(this.destinationPrefix.length()-1);
}
else if (SimpMessageType.MESSAGE.equals(messageType)) {
if (!destination.startsWith(this.destinationPrefix)) {
logger.trace("Ignoring message, not a \"user\" destination");
return null;
}
int startIndex = this.destinationPrefix.length();
int endIndex = destination.indexOf('/', startIndex);
Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\"");
targetUser = destination.substring(startIndex, endIndex);
targetDestination = destination.substring(endIndex);
}
else {
logger.trace("Ignoring message, not of the right message type");
return null;
}
return new UserDestinationInfo(targetUser, targetDestination);
}
private static class UserDestinationInfo {
private final String user;
private final String destination;
private UserDestinationInfo(String user, String destination) {
this.user = user;
this.destination = destination;
}
private String getUser() {
return this.user;
}
private String getDestination() {
return this.destination;
}
}
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import org.springframework.util.Assert;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* A default thread-safe implementation of {@link @UserSessionRegistry}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class DefaultUserSessionRegistry implements UserSessionRegistry {
// userId -> sessionId
private final ConcurrentMap<String, Set<String>> userSessionIds = new ConcurrentHashMap<String, Set<String>>();
private final Object lock = new Object();
@Override
public Set<String> getSessionIds(String user) {
Set<String> set = this.userSessionIds.get(user);
return (set != null) ? set : Collections.<String>emptySet();
}
@Override
public void registerSessionId(String user, String sessionId) {
Assert.notNull(user, "'user' is required");
Assert.notNull(user, "'sessionId' is required");
synchronized (this.lock) {
Set<String> set = this.userSessionIds.get(user);
if (set == null) {
set = new CopyOnWriteArraySet<String>();
this.userSessionIds.put(user, set);
}
set.add(sessionId);
}
}
@Override
public void unregisterSessionId(String userName, String sessionId) {
Assert.notNull(userName, "'userName' is required");
Assert.notNull(userName, "'sessionId' is required");
synchronized (lock) {
Set<String> set = this.userSessionIds.get(userName);
if (set != null) {
if (set.remove(sessionId) && set.isEmpty()) {
this.userSessionIds.remove(userName);
}
}
}
}
}

View File

@@ -1,30 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MutableUserQueueSuffixResolver extends UserQueueSuffixResolver {
void addQueueSuffix(String user, String sessionId, String suffix);
void removeQueueSuffix(String user, String sessionId);
}

View File

@@ -1,75 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleUserQueueSuffixResolver implements MutableUserQueueSuffixResolver {
// userId -> [sessionId -> queueSuffix]
private final ConcurrentMap<String, Map<String, String>> cache = new ConcurrentHashMap<String, Map<String, String>>();
@Override
public void addQueueSuffix(String user, String sessionId, String suffix) {
Map<String, String> suffixes = this.cache.get(user);
if (suffixes == null) {
suffixes = new ConcurrentHashMap<String, String>();
Map<String, String> prevSuffixes = this.cache.putIfAbsent(user, suffixes);
if (prevSuffixes != null) {
suffixes = prevSuffixes;
}
}
suffixes.put(sessionId, suffix);
}
@Override
public void removeQueueSuffix(String user, String sessionId) {
Map<String, String> suffixes = this.cache.get(user);
if (suffixes != null) {
if (suffixes.remove(sessionId) != null) {
this.cache.remove(user, Collections.emptyMap());
}
}
}
@Override
public Set<String> getUserQueueSuffixes(String user) {
Map<String, String> suffixes = this.cache.get(user);
return (suffixes != null) ? new HashSet<String>(suffixes.values()) : Collections.<String>emptySet();
}
@Override
public String getUserQueueSuffix(String user, String sessionId) {
Map<String, String> suffixes = this.cache.get(user);
if (suffixes != null) {
return suffixes.get(sessionId);
}
return null;
}
}

View File

@@ -23,20 +23,19 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.util.Set;
/**
* Supports destinations prefixed with "/user/{username}", transforms the
* destination to a unique queue to which the user is subscribed, and then sends
* the message for further processing.
*
* <p>The target destination has the prefix removed and a unique queue suffix,
* resolved via {@link #setUserQueueSuffixResolver(UserQueueSuffixResolver)}, appended.
* For example a destination such as "/user/john/queue/trade-confirmation" could
* be transformed to "/queue/trade-confirmation/i9oqdfzo".
* Provides support for messages sent to "user" destinations, translating the
* destination to one or more user-specific destination(s) and then sending message(s)
* with the updated target destination using the provided messaging template.
* <p>
* See {@link UserDestinationResolver} for more details and examples.
*
* @author Rossen Stoyanchev
* @since 4.0
@@ -45,140 +44,67 @@ public class UserDestinationMessageHandler implements MessageHandler {
private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class);
private final MessageSendingOperations<String> messagingTemplate;
private String destinationPrefix = "/user/";
private UserQueueSuffixResolver userQueueSuffixResolver = new SimpleUserQueueSuffixResolver();
private final UserDestinationResolver userDestinationResolver;
/**
* Create an instance of the handler with the given messaging template and a
* user destination resolver.
*
* @param messagingTemplate
* @param resolver the resolver to use to find queue suffixes for a user
* @param messagingTemplate a messaging template to use for sending messages
* with translated user destinations
* @param userDestinationResolver the resolver to use to find queue suffixes for a user
*/
public UserDestinationMessageHandler(MessageSendingOperations<String> messagingTemplate,
UserQueueSuffixResolver userQueueSuffixResolver) {
UserDestinationResolver userDestinationResolver) {
Assert.notNull(messagingTemplate, "messagingTemplate is required");
Assert.notNull(userQueueSuffixResolver, "userQueueSuffixResolver is required");
Assert.notNull(userDestinationResolver, "destinationResolver is required");
this.messagingTemplate = messagingTemplate;
this.userQueueSuffixResolver = userQueueSuffixResolver;
this.userDestinationResolver = userDestinationResolver;
}
/**
* <p>The default prefix is "/user".
* @param prefix the prefix to set
* Return the configured {@link UserDestinationResolver}.
*/
public void setDestinationPrefix(String prefix) {
Assert.hasText(prefix, "prefix is required");
this.destinationPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
public UserDestinationResolver getUserDestinationResolver() {
return this.userDestinationResolver;
}
/**
* @return the prefix
*/
public String getDestinationPrefix() {
return this.destinationPrefix;
}
/**
* @return the resolver for queue suffixes for a user
*/
public UserQueueSuffixResolver getUserQueueSuffixResolver() {
return this.userQueueSuffixResolver;
}
/**
* @return the messagingTemplate
* Return the configured messaging template for sending messages with
* translated destinations.
*/
public MessageSendingOperations<String> getMessagingTemplate() {
return this.messagingTemplate;
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
SimpMessageType messageType = headers.getMessageType();
String destination = headers.getDestination();
if (!SimpMessageType.MESSAGE.equals(messageType)) {
return;
}
if (!checkDestination(destination)) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Processing message to destination " + destination);
logger.trace("Handling message " + message);
}
UserDestinationParser destinationParser = new UserDestinationParser(destination);
String user = destinationParser.getUser();
if (user == null) {
if (logger.isErrorEnabled()) {
logger.error("Ignoring message, expected destination pattern \"" + this.destinationPrefix
+ "{userId}/**\": " + destination);
}
Set<String> destinations = this.userDestinationResolver.resolveDestination(message);
if (CollectionUtils.isEmpty(destinations)) {
return;
}
for (String sessionId : this.userQueueSuffixResolver.getUserQueueSuffixes(user)) {
String targetDestination = destinationParser.getTargetDestination(sessionId);
for (String targetDestination : destinations) {
if (logger.isTraceEnabled()) {
logger.trace("Sending message to resolved user destination: " + targetDestination);
}
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
headers.setDestination(targetDestination);
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
if (logger.isTraceEnabled()) {
logger.trace("Sending message to resolved target destination " + targetDestination);
}
this.messagingTemplate.send(targetDestination, message);
}
}
private boolean checkDestination(String destination) {
if (destination != null) {
if (destination.startsWith(this.destinationPrefix)) {
return true;
}
}
return false;
}
private class UserDestinationParser {
private final String user;
private final String targetDestination;
public UserDestinationParser(String destination) {
int userStartIndex = destinationPrefix.length();
int userEndIndex = destination.indexOf('/', userStartIndex);
if (userEndIndex > 0) {
this.user = destination.substring(userStartIndex, userEndIndex);
this.targetDestination = destination.substring(userEndIndex);
}
else {
this.user = null;
this.targetDestination = null;
}
}
public String getUser() {
return this.user;
}
public String getTargetDestination(String sessionId) {
return (this.targetDestination != null) ? this.targetDestination + sessionId : null;
}
}
}

View File

@@ -0,0 +1,54 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import org.springframework.messaging.Message;
import java.util.Set;
/**
* A strategy for resolving unique, user destinations per session. User destinations
* provide a user with the ability to subscribe to a queue unique to their session
* as well others with the ability to send messages to those queues.
* <p>
* For example when a user attempts to subscribe to "/user/queue/position-updates",
* the destination may be resolved to "/queue/position-updates-useri9oqdfzo" yielding a
* unique queue name that does not collide with any other user attempting to do the same.
* Subsequently when messages are sent to "/user/{username}/queue/position-updates",
* the destination is translated to "/queue/position-updates-useri9oqdfzo".
*
* @author Rossen Stoyanchev
* @since 4.0
*
* @see UserDestinationMessageHandler
*/
public interface UserDestinationResolver {
/**
* Resolve the destination of the message to one or more user/session-specific target
* destinations. If the user has multiple sessions, the method may return more than
* one target destinations.
*
* @param message the message to resolve
*
* @return the resolved unique user destinations or an empty Set if the message
* destination is not recognized as a user destination
*/
Set<String> resolveDestination(Message<?> message);
}

View File

@@ -1,48 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import java.util.Set;
/**
* A strategy for resolving unique queue suffixes for a connected user.
* There can be only one suffix per user per session.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface UserQueueSuffixResolver {
/**
* Retrieve the suffixes for all sessions associated with this user.
*
* @param user the user name
* @return a Set with zero, one, or more, queue suffixes
*/
Set<String> getUserQueueSuffixes(String user);
/**
* Retrieve the queue suffix associated with the given user session.
*
* @param user the user name
* @param sessionId the session id
* @return a queue suffix or {@code null}
*/
String getUserQueueSuffix(String user, String sessionId);
}

View File

@@ -0,0 +1,44 @@
package org.springframework.messaging.simp.handler;
import java.util.Set;
/**
* A registry for looking up active session id's by user.
* <p>
* Used in support of resolving unique session-specific user destinations.
* See {@link DefaultUserDestinationResolver} for more details.
*
* @author Rossen Stoyanchev
* @since 4.0
*
* @see DefaultUserDestinationResolver
*/
public interface UserSessionRegistry {
/**
* Return the active session id's for the given user.
*
* @param user the user
* @return a set with 0 or more session id's
*/
Set<String> getSessionIds(String user);
/**
* Register an active session id for the given user.
*
* @param user the user
* @param sessionId the session id
*/
void registerSessionId(String user, String sessionId);
/**
* Unregister the session id for a user.
*
* @param user the user
* @param sessionId the session id
*/
void unregisterSessionId(String user, String sessionId);
}

View File

@@ -30,8 +30,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.websocket.SubProtocolHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver;
import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
@@ -45,6 +44,7 @@ import org.springframework.web.socket.WebSocketSession;
*
* @author Rossen Stoyanchev
* @author Andy Wilkinson
* @since 4.0
*/
public class StompProtocolHandler implements SubProtocolHandler {
@@ -54,36 +54,30 @@ public class StompProtocolHandler implements SubProtocolHandler {
*/
public static final String CONNECTED_USER_HEADER = "user-name";
/**
* A suffix unique to the current session that a client can use to append to
* a destination to make it unique.
*
* @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler}
*/
public static final String QUEUE_SUFFIX_HEADER = "queue-suffix";
private static final Log logger = LogFactory.getLog(StompProtocolHandler.class);
private final Log logger = LogFactory.getLog(StompProtocolHandler.class);
private final StompDecoder stompDecoder = new StompDecoder();
private final StompEncoder stompEncoder = new StompEncoder();
private MutableUserQueueSuffixResolver queueSuffixResolver = new SimpleUserQueueSuffixResolver();
private UserSessionRegistry userSessionRegistry;
/**
* Configure a resolver to use to maintain queue suffixes for user
* Provide a registry with which to register active user session ids.
*
* @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler}
*/
public void setUserQueueSuffixResolver(MutableUserQueueSuffixResolver resolver) {
this.queueSuffixResolver = resolver;
public void setUserSessionRegistry(UserSessionRegistry registry) {
this.userSessionRegistry = registry;
}
/**
* @return the resolver for queue suffixes for a user
* @return the configured UserSessionRegistry.
*/
public MutableUserQueueSuffixResolver getUserQueueSuffixResolver() {
return this.queueSuffixResolver;
public UserSessionRegistry getUserSessionRegistry() {
return this.userSessionRegistry;
}
@Override
@@ -162,7 +156,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
}
if (headers.getCommand() == StompCommand.CONNECTED) {
augmentConnectedHeaders(headers, session);
afterStompSessionConnected(headers, session);
}
if (StompCommand.MESSAGE.equals(headers.getCommand()) && (headers.getSubscriptionId() == null)) {
@@ -189,6 +183,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
session.close(CloseStatus.PROTOCOL_ERROR);
}
catch (IOException e) {
// Ignore
}
}
}
@@ -216,15 +211,13 @@ public class StompProtocolHandler implements SubProtocolHandler {
}
}
private void augmentConnectedHeaders(StompHeaderAccessor headers, WebSocketSession session) {
private void afterStompSessionConnected(StompHeaderAccessor headers, WebSocketSession session) {
Principal principal = session.getPrincipal();
if (principal != null) {
headers.setNativeHeader(CONNECTED_USER_HEADER, principal.getName());
headers.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId());
if (this.queueSuffixResolver != null) {
if (this.userSessionRegistry != null) {
String suffix = session.getId();
this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix);
this.userSessionRegistry.registerSessionId(principal.getName(), session.getId());
}
}
}
@@ -242,8 +235,8 @@ public class StompProtocolHandler implements SubProtocolHandler {
@Override
public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) {
if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) {
this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), session.getId());
if ((this.userSessionRegistry != null) && (session.getPrincipal() != null)) {
this.userSessionRegistry.unregisterSessionId(session.getPrincipal().getName(), session.getId());
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);