Add XML namespace for WebSocket config

This commit adds an XML namespace equivalent of @EnableWebSocket and
@EnableWebSocketMessageBroker. Those are <websocket:handlers> and
<websocket:message-broker> respectively.

Examples can be found in the test suite.

This commit also alters the way MessageHandler's subscribe to their
respective MessageChannel's of interest. Rather than performing the
subscriptions in configuration code, the message channels are now
passed into MessageHandler's so they can subscribe themselves on
startup.

Issue: SPR-11063
This commit is contained in:
Brian Clozel
2013-11-26 20:04:57 +01:00
committed by Rossen Stoyanchev
parent 8f1fefc159
commit 10f5d96a78
44 changed files with 2434 additions and 171 deletions

View File

@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.util.Assert;
@@ -33,19 +34,31 @@ import org.springframework.util.Assert;
*/
public abstract class AbstractBrokerRegistration {
private final SubscribableChannel clientInboundChannel;
private final MessageChannel clientOutboundChannel;
private final List<String> destinationPrefixes;
public AbstractBrokerRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
public AbstractBrokerRegistration(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
Assert.notNull(clientOutboundChannel, "'clientInboundChannel' must not be null");
Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientOutboundChannel = clientOutboundChannel;
this.destinationPrefixes = (destinationPrefixes != null)
? Arrays.<String>asList(destinationPrefixes) : Collections.<String>emptyList();
}
protected SubscribableChannel getClientInboundChannel() {
return this.clientInboundChannel;
}
protected MessageChannel getClientOutboundChannel() {
return this.clientOutboundChannel;
}
@@ -54,6 +67,6 @@ public abstract class AbstractBrokerRegistration {
return this.destinationPrefixes;
}
protected abstract AbstractBrokerMessageHandler getMessageHandler();
protected abstract AbstractBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel);
}

View File

@@ -47,7 +47,7 @@ import java.util.List;
* into any application component to send messages.
* <p>
* Sub-classes are responsible for the part of the configuration that feed messages
* to and from the client inbound/outbound channels (e.g. STOMP over WebSokcet).
* to and from the client inbound/outbound channels (e.g. STOMP over WebSocket).
*
* @author Rossen Stoyanchev
* @since 4.0
@@ -86,7 +86,7 @@ public abstract class AbstractMessageBrokerConfiguration {
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
TaskExecutorRegistration r = getClientInboundChannelRegistration().getTaskExecutorRegistration();
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("ClientInboundChannel-");
executor.setThreadNamePrefix("clientInboundChannel-");
return executor;
}
@@ -121,7 +121,7 @@ public abstract class AbstractMessageBrokerConfiguration {
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
TaskExecutorRegistration r = getClientOutboundChannelRegistration().getTaskExecutorRegistration();
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("ClientOutboundChannel-");
executor.setThreadNamePrefix("clientOutboundChannel-");
return executor;
}
@@ -160,7 +160,7 @@ public abstract class AbstractMessageBrokerConfiguration {
public ThreadPoolTaskExecutor brokerChannelExecutor() {
TaskExecutorRegistration r = getBrokerRegistry().getBrokerChannelRegistration().getTaskExecutorRegistration();
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("BrokerChannel-");
executor.setThreadNamePrefix("brokerChannel-");
return executor;
}
@@ -170,7 +170,7 @@ public abstract class AbstractMessageBrokerConfiguration {
*/
protected final MessageBrokerRegistry getBrokerRegistry() {
if (this.brokerRegistry == null) {
MessageBrokerRegistry registry = new MessageBrokerRegistry(clientOutboundChannel());
MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel(), clientOutboundChannel());
configureMessageBroker(registry);
this.brokerRegistry = registry;
}
@@ -187,45 +187,30 @@ public abstract class AbstractMessageBrokerConfiguration {
@Bean
public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() {
SimpAnnotationMethodMessageHandler handler =
new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), clientOutboundChannel());
SimpAnnotationMethodMessageHandler handler = new SimpAnnotationMethodMessageHandler(
clientInboundChannel(), clientOutboundChannel(), brokerMessagingTemplate());
handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes());
handler.setMessageConverter(brokerMessageConverter());
clientInboundChannel().subscribe(handler);
return handler;
}
@Bean
public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker();
if (handler != null) {
clientInboundChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
return noopBroker;
SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(brokerChannel());
return (handler != null) ? handler : noopBroker;
}
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
AbstractBrokerMessageHandler handler = getBrokerRegistry().getStompBrokerRelay();
if (handler != null) {
clientInboundChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
return noopBroker;
AbstractBrokerMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
return (handler != null) ? handler : noopBroker;
}
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
brokerMessagingTemplate(), userDestinationResolver());
clientInboundChannel().subscribe(handler);
brokerChannel().subscribe(handler);
clientInboundChannel(), clientOutboundChannel(), brokerChannel(), userDestinationResolver());
return handler;
}

View File

@@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.Collection;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.util.Assert;
@@ -32,11 +33,13 @@ import org.springframework.util.Assert;
*/
public class MessageBrokerRegistry {
private final SubscribableChannel clientInboundChannel;
private final MessageChannel clientOutboundChannel;
private SimpleBrokerRegistration simpleBroker;
private SimpleBrokerRegistration simpleBrokerRegistration;
private StompBrokerRelayRegistration stompRelay;
private StompBrokerRelayRegistration brokerRelayRegistration;
private String[] applicationDestinationPrefixes;
@@ -45,8 +48,10 @@ public class MessageBrokerRegistry {
private ChannelRegistration brokerChannelRegistration = new ChannelRegistration();
public MessageBrokerRegistry(MessageChannel clientOutboundChannel) {
public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) {
Assert.notNull(clientInboundChannel);
Assert.notNull(clientOutboundChannel);
this.clientInboundChannel = clientInboundChannel;
this.clientOutboundChannel = clientOutboundChannel;
}
@@ -55,8 +60,9 @@ public class MessageBrokerRegistry {
* destinations targeting the broker (e.g. destinations prefixed with "/topic").
*/
public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) {
this.simpleBroker = new SimpleBrokerRegistration(this.clientOutboundChannel, destinationPrefixes);
return this.simpleBroker;
this.simpleBrokerRegistration = new SimpleBrokerRegistration(
this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes);
return this.simpleBrokerRegistration;
}
/**
@@ -65,8 +71,9 @@ public class MessageBrokerRegistry {
* destinations.
*/
public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) {
this.stompRelay = new StompBrokerRelayRegistration(this.clientOutboundChannel, destinationPrefixes);
return this.stompRelay;
this.brokerRelayRegistration = new StompBrokerRelayRegistration(
this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes);
return this.brokerRelayRegistration;
}
/**
@@ -113,19 +120,21 @@ public class MessageBrokerRegistry {
}
protected SimpleBrokerMessageHandler getSimpleBroker() {
initSimpleBrokerIfNecessary();
return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null;
}
protected void initSimpleBrokerIfNecessary() {
if ((this.simpleBroker == null) && (this.stompRelay == null)) {
this.simpleBroker = new SimpleBrokerRegistration(this.clientOutboundChannel, null);
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
if ((this.simpleBrokerRegistration == null) && (this.brokerRelayRegistration == null)) {
enableSimpleBroker();
}
if (this.simpleBrokerRegistration != null) {
return this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
}
return null;
}
protected StompBrokerRelayMessageHandler getStompBrokerRelay() {
return (this.stompRelay != null) ? this.stompRelay.getMessageHandler() : null;
protected StompBrokerRelayMessageHandler getStompBrokerRelay(SubscribableChannel brokerChannel) {
if (this.brokerRelayRegistration != null) {
return this.brokerRelayRegistration.getMessageHandler(brokerChannel);
}
return null;
}
protected Collection<String> getApplicationDestinationPrefixes() {

View File

@@ -17,6 +17,7 @@
package org.springframework.messaging.simp.config;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
/**
@@ -28,14 +29,17 @@ import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
public SimpleBrokerRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
super(clientOutboundChannel, destinationPrefixes);
public SimpleBrokerRegistration(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
super(clientInboundChannel, clientOutboundChannel, destinationPrefixes);
}
@Override
protected SimpleBrokerMessageHandler getMessageHandler() {
return new SimpleBrokerMessageHandler(getClientOutboundChannel(), getDestinationPrefixes());
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
return new SimpleBrokerMessageHandler(getClientInboundChannel(),
getClientOutboundChannel(), brokerChannel, getDestinationPrefixes());
}
}

View File

@@ -17,6 +17,7 @@
package org.springframework.messaging.simp.config;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.util.Assert;
@@ -43,8 +44,10 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
private boolean autoStartup = true;
public StompBrokerRelayRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
super(clientOutboundChannel, destinationPrefixes);
public StompBrokerRelayRegistration(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
super(clientInboundChannel, clientOutboundChannel, destinationPrefixes);
}
@@ -119,10 +122,10 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
}
protected StompBrokerRelayMessageHandler getMessageHandler() {
protected StompBrokerRelayMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
StompBrokerRelayMessageHandler handler =
new StompBrokerRelayMessageHandler(getClientOutboundChannel(), getDestinationPrefixes());
StompBrokerRelayMessageHandler handler = new StompBrokerRelayMessageHandler(getClientInboundChannel(),
getClientOutboundChannel(), brokerChannel, getDestinationPrefixes());
handler.setRelayHost(this.relayHost);
handler.setRelayPort(this.relayPort);

View File

@@ -27,11 +27,13 @@ import java.util.Set;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.ConversionService;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.support.AnnotationExceptionHandlerMethodResolver;
@@ -54,6 +56,7 @@ import org.springframework.messaging.simp.annotation.support.PrincipalMethodArgu
import org.springframework.messaging.simp.annotation.support.SendToMethodReturnValueHandler;
import org.springframework.messaging.simp.annotation.support.SubscriptionMethodReturnValueHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
import org.springframework.messaging.support.converter.ByteArrayMessageConverter;
import org.springframework.messaging.support.converter.CompositeMessageConverter;
import org.springframework.messaging.support.converter.MessageConverter;
@@ -74,30 +77,44 @@ import org.springframework.util.PathMatcher;
* @author Brian Clozel
* @since 4.0
*/
public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHandler<SimpMessageMappingInfo> {
public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHandler<SimpMessageMappingInfo>
implements SmartLifecycle {
private final SimpMessageSendingOperations brokerTemplate;
private final SubscribableChannel clientInboundChannel;
private final SimpMessageSendingOperations clientMessagingTemplate;
private final SimpMessageSendingOperations brokerTemplate;
private MessageConverter messageConverter;
private ConversionService conversionService = new DefaultFormattingConversionService();
private PathMatcher pathMatcher = new AntPathMatcher();
private Object lifecycleMonitor = new Object();
private volatile boolean running = false;
/**
* @param brokerTemplate a messaging template to send application messages to the broker
* Create an instance of SimpAnnotationMethodMessageHandler with the given
* message channels and broker messaging template.
*
* @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutboundChannel the channel for messages to clients (e.g. WebSocket clients)
* @param brokerTemplate a messaging template to send application messages to the broker
*/
public SimpAnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate,
MessageChannel clientOutboundChannel) {
public SimpAnnotationMethodMessageHandler(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, SimpMessageSendingOperations brokerTemplate) {
Assert.notNull(brokerTemplate, "BrokerTemplate must not be null");
Assert.notNull(clientOutboundChannel, "ClientOutboundChannel must not be null");
this.brokerTemplate = brokerTemplate;
Assert.notNull(clientInboundChannel, "clientInboundChannel must not be null");
Assert.notNull(clientOutboundChannel, "clientOutboundChannel must not be null");
Assert.notNull(brokerTemplate, "brokerTemplate must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientMessagingTemplate = new SimpMessagingTemplate(clientOutboundChannel);
this.brokerTemplate = brokerTemplate;
Collection<MessageConverter> converters = new ArrayList<MessageConverter>();
converters.add(new StringMessageConverter());
@@ -159,6 +176,46 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
return this.pathMatcher;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public final boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.running;
}
}
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
this.clientInboundChannel.subscribe(this);
this.running = true;
}
}
@Override
public final void stop() {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.clientInboundChannel.unsubscribe(this);
}
}
@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {

View File

@@ -20,6 +20,7 @@ import java.util.Collection;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
@@ -34,23 +35,49 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private final MessageChannel messageChannel;
private final SubscribableChannel clientInboundChannel;
private final MessageChannel clientOutboundChannel;
private final SubscribableChannel brokerChannel;
private SubscriptionRegistry subscriptionRegistry = new DefaultSubscriptionRegistry();
/**
* @param messageChannel the channel to broadcast messages to
* Create a SimpleBrokerMessageHandler instance with the given message channels
* and destination prefixes.
*
* @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
* @param brokerChannel the channel for the application to send messages to the broker
*/
public SimpleBrokerMessageHandler(MessageChannel messageChannel, Collection<String> destinationPrefixes) {
public SimpleBrokerMessageHandler(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel,
SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
super(destinationPrefixes);
Assert.notNull(messageChannel, "MessageChannel must not be null");
this.messageChannel = messageChannel;
Assert.notNull(clientInboundChannel, "'clientInboundChannel' must not be null");
Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientOutboundChannel = clientOutboundChannel;
this.brokerChannel = brokerChannel;
}
public MessageChannel getMessageChannel() {
return this.messageChannel;
public SubscribableChannel getClientInboundChannel() {
return this.clientInboundChannel;
}
public MessageChannel getClientOutboundChannel() {
return this.clientOutboundChannel;
}
public SubscribableChannel getBrokerChannel() {
return this.brokerChannel;
}
public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
@@ -66,11 +93,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
@Override
public void startInternal() {
publishBrokerAvailableEvent();
this.clientInboundChannel.subscribe(this);
this.brokerChannel.subscribe(this);
}
@Override
public void stopInternal() {
publishBrokerUnavailableEvent();
this.clientInboundChannel.unsubscribe(this);
this.brokerChannel.unsubscribe(this);
}
@Override
@@ -106,7 +137,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
replyHeaders.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
Message<byte[]> connectAck = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(replyHeaders).build();
this.messageChannel.send(connectAck);
this.clientOutboundChannel.send(connectAck);
}
}
@@ -124,7 +155,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
Object payload = message.getPayload();
Message<?> clientMessage = MessageBuilder.withPayload(payload).setHeaders(headers).build();
try {
this.messageChannel.send(clientMessage);
this.clientOutboundChannel.send(clientMessage);
}
catch (Throwable ex) {
logger.error("Failed to send message to destination=" + destination +

View File

@@ -20,10 +20,10 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.*;
import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -37,28 +37,46 @@ import org.springframework.util.CollectionUtils;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class UserDestinationMessageHandler implements MessageHandler {
public class UserDestinationMessageHandler implements MessageHandler, SmartLifecycle {
private static final Log logger = LogFactory.getLog(UserDestinationMessageHandler.class);
private final MessageSendingOperations<String> messagingTemplate;
private final SubscribableChannel clientInboundChannel;
private final MessageChannel clientOutboundChannel;
private final SubscribableChannel brokerChannel;
private final MessageSendingOperations<String> brokerMessagingTemplate;
private final UserDestinationResolver userDestinationResolver;
private Object lifecycleMonitor = new Object();
private volatile boolean running = false;
/**
* Create an instance of the handler with the given messaging template and a
* user destination resolver.
* @param messagingTemplate a messaging template to use for sending messages
* with translated user destinations
* @param clientInChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutChannel the channel for sending messages to clients (e.g. WebSocket clients)
* @param brokerChannel the channel for sending messages with translated user destinations
* @param userDestinationResolver the resolver to use to find queue suffixes for a user
*/
public UserDestinationMessageHandler(MessageSendingOperations<String> messagingTemplate,
UserDestinationResolver userDestinationResolver) {
Assert.notNull(messagingTemplate, "MessagingTemplate must not be null");
public UserDestinationMessageHandler(SubscribableChannel clientInChannel, MessageChannel clientOutChannel,
SubscribableChannel brokerChannel, UserDestinationResolver userDestinationResolver) {
Assert.notNull(clientInChannel, "'clientInChannel' must not be null");
Assert.notNull(clientOutChannel, "'clientOutChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
Assert.notNull(userDestinationResolver, "DestinationResolver must not be null");
this.messagingTemplate = messagingTemplate;
this.clientInboundChannel = clientInChannel;
this.clientOutboundChannel = clientOutChannel;
this.brokerChannel = brokerChannel;
this.brokerMessagingTemplate = new SimpMessagingTemplate(brokerChannel);
this.userDestinationResolver = userDestinationResolver;
}
@@ -73,10 +91,52 @@ public class UserDestinationMessageHandler implements MessageHandler {
* Return the configured messaging template for sending messages with
* translated destinations.
*/
public MessageSendingOperations<String> getMessagingTemplate() {
return this.messagingTemplate;
public MessageSendingOperations<String> getBrokerMessagingTemplate() {
return this.brokerMessagingTemplate;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public final boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.running;
}
}
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
this.clientInboundChannel.subscribe(this);
this.brokerChannel.subscribe(this);
this.running = true;
}
}
@Override
public final void stop() {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.clientInboundChannel.unsubscribe(this);
this.brokerChannel.unsubscribe(this);
}
}
@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
@@ -90,7 +150,7 @@ public class UserDestinationMessageHandler implements MessageHandler {
if (logger.isDebugEnabled()) {
logger.debug("Sending message to resolved destination=" + targetDestination);
}
this.messagingTemplate.send(targetDestination, message);
this.brokerMessagingTemplate.send(targetDestination, message);
}
}

View File

@@ -21,10 +21,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.*;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
@@ -77,7 +74,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
private final MessageChannel messageChannel;
private final SubscribableChannel clientInboundChannel;
private final MessageChannel clientOutboundChannel;
private final SubscribableChannel brokerChannel;
private String relayHost = "127.0.0.1";
@@ -100,14 +101,28 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* @param messageChannel the channel to send messages from the STOMP broker to
* Create a StompBrokerRelayMessageHandler instance with the given message channels
* and destination prefixes.
*
* @param clientInChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutChannel the channel for sending messages to clients (e.g. WebSocket clients)
* @param brokerChannel the channel for the application to send messages to the broker
* @param destinationPrefixes the broker supported destination prefixes; destinations
* that do not match the given prefix are ignored.
*/
public StompBrokerRelayMessageHandler(MessageChannel messageChannel, Collection<String> destinationPrefixes) {
public StompBrokerRelayMessageHandler(SubscribableChannel clientInChannel, MessageChannel clientOutChannel,
SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
super(destinationPrefixes);
Assert.notNull(messageChannel, "MessageChannel must not be null");
this.messageChannel = messageChannel;
Assert.notNull(clientInChannel, "'clientInChannel' must not be null");
Assert.notNull(clientOutChannel, "'clientOutChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
this.clientInboundChannel = clientInChannel;
this.clientOutboundChannel = clientOutChannel;
this.brokerChannel = brokerChannel;
}
@@ -242,6 +257,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void startInternal() {
this.clientInboundChannel.subscribe(this);
this.brokerChannel.subscribe(this);
if (this.tcpClient == null) {
this.tcpClient = new ReactorNettyTcpClient<byte[]>(this.relayHost, this.relayPort, new StompCodec());
}
@@ -265,6 +283,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void stopInternal() {
this.clientInboundChannel.unsubscribe(this);
this.brokerChannel.unsubscribe(this);
for (StompConnectionHandler handler : this.connectionHandlers.values()) {
try {
handler.resetTcpConnection();
@@ -416,7 +438,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
protected void sendMessageToClient(Message<?> message) {
if (this.isRemoteClientSession) {
StompBrokerRelayMessageHandler.this.messageChannel.send(message);
StompBrokerRelayMessageHandler.this.clientOutboundChannel.send(message);
}
}

View File

@@ -43,7 +43,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
/**
* Whether the given {@link MessageHandler} is already subscribed.
*/
protected abstract boolean hasSubscription(MessageHandler handler);
public abstract boolean hasSubscription(MessageHandler handler);
/**
* Subscribe the given {@link MessageHandler}.

View File

@@ -62,7 +62,7 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
}
@Override
protected boolean hasSubscription(MessageHandler handler) {
public boolean hasSubscription(MessageHandler handler) {
return this.handlers.contains(handler);
}