Add tests to spring-messaging
This commit is contained in:
@@ -31,10 +31,10 @@ import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* A {@link HandlerMethodReturnValueHandler} for replying directly to a subscription. It
|
||||
* supports methods annotated with {@link SubscribeEvent} that do not also annotated with
|
||||
* neither {@link ReplyTo} nor {@link ReplyToUser}.
|
||||
*
|
||||
* <p>The value returned from the method is converted, and turned to a {@link Message} and
|
||||
* supports methods annotated with {@link SubscribeEvent} unless they're also annotated
|
||||
* with {@link ReplyTo} or {@link ReplyToUser}.
|
||||
* <p>
|
||||
* The value returned from the method is converted, and turned to a {@link Message} and
|
||||
* then enriched with the sessionId, subscriptionId, and destination of the input message.
|
||||
* The message is then sent directly back to the connected client.
|
||||
*
|
||||
@@ -73,8 +73,7 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
|
||||
String destination = inputHeaders.getDestination();
|
||||
|
||||
Assert.state(inputHeaders.getSubscriptionId() != null,
|
||||
"No subsriptiondId in input message. Add @ReplyTo or @ReplyToUser to method: "
|
||||
+ returnType.getMethod());
|
||||
"No subsriptiondId in input message to method " + returnType.getMethod());
|
||||
|
||||
MessagePostProcessor postProcessor = new SubscriptionHeaderPostProcessor(sessionId, subscriptionId);
|
||||
this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor);
|
||||
|
||||
@@ -30,8 +30,7 @@ import org.springframework.web.socket.sockjs.transport.handler.WebSocketTranspor
|
||||
|
||||
|
||||
/**
|
||||
* A helper class for configuring STOMP protocol handling over WebSocket
|
||||
* with optional SockJS fallback options.
|
||||
* An abstract base class class for configuring STOMP over WebSocket/SockJS endpoints.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
@@ -46,33 +45,36 @@ public abstract class AbstractStompEndpointRegistration<M> implements StompEndpo
|
||||
|
||||
private StompSockJsServiceRegistration sockJsServiceRegistration;
|
||||
|
||||
private final TaskScheduler defaultSockJsTaskScheduler;
|
||||
private final TaskScheduler sockJsTaskScheduler;
|
||||
|
||||
|
||||
public AbstractStompEndpointRegistration(String[] paths, SubProtocolWebSocketHandler webSocketHandler,
|
||||
TaskScheduler defaultSockJsTaskScheduler) {
|
||||
TaskScheduler sockJsTaskScheduler) {
|
||||
|
||||
Assert.notEmpty(paths, "No paths specified");
|
||||
this.paths = paths;
|
||||
this.wsHandler = webSocketHandler;
|
||||
this.defaultSockJsTaskScheduler = defaultSockJsTaskScheduler;
|
||||
this.sockJsTaskScheduler = sockJsTaskScheduler;
|
||||
}
|
||||
|
||||
|
||||
protected SubProtocolWebSocketHandler getWsHandler() {
|
||||
return this.wsHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a custom or pre-configured {@link HandshakeHandler}. This property is
|
||||
* optional.
|
||||
*/
|
||||
@Override
|
||||
public StompEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler) {
|
||||
this.handshakeHandler = handshakeHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable SockJS fallback options.
|
||||
*/
|
||||
@Override
|
||||
public SockJsServiceRegistration withSockJS() {
|
||||
|
||||
this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.defaultSockJsTaskScheduler);
|
||||
this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.sockJsTaskScheduler);
|
||||
|
||||
if (this.handshakeHandler != null) {
|
||||
WebSocketTransportHandler transportHandler = new WebSocketTransportHandler(this.handshakeHandler);
|
||||
@@ -82,7 +84,7 @@ public abstract class AbstractStompEndpointRegistration<M> implements StompEndpo
|
||||
return this.sockJsServiceRegistration;
|
||||
}
|
||||
|
||||
protected M getMappings() {
|
||||
protected final M getMappings() {
|
||||
|
||||
M mappings = createMappings();
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ import reactor.util.Assert;
|
||||
*/
|
||||
public class MessageBrokerConfigurer {
|
||||
|
||||
private final MessageChannel webSocketReplyChannel;
|
||||
private final MessageChannel webSocketResponseChannel;
|
||||
|
||||
private SimpleBrokerRegistration simpleBroker;
|
||||
|
||||
@@ -42,18 +42,18 @@ public class MessageBrokerConfigurer {
|
||||
private String[] annotationMethodDestinationPrefixes;
|
||||
|
||||
|
||||
public MessageBrokerConfigurer(MessageChannel webSocketReplyChannel) {
|
||||
Assert.notNull(webSocketReplyChannel);
|
||||
this.webSocketReplyChannel = webSocketReplyChannel;
|
||||
public MessageBrokerConfigurer(MessageChannel webSocketResponseChannel) {
|
||||
Assert.notNull(webSocketResponseChannel);
|
||||
this.webSocketResponseChannel = webSocketResponseChannel;
|
||||
}
|
||||
|
||||
public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) {
|
||||
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, destinationPrefixes);
|
||||
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, destinationPrefixes);
|
||||
return this.simpleBroker;
|
||||
}
|
||||
|
||||
public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) {
|
||||
this.stompRelay = new StompBrokerRelayRegistration(this.webSocketReplyChannel, destinationPrefixes);
|
||||
this.stompRelay = new StompBrokerRelayRegistration(this.webSocketResponseChannel, destinationPrefixes);
|
||||
return this.stompRelay;
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ public class MessageBrokerConfigurer {
|
||||
|
||||
protected void initSimpleBrokerIfNecessary() {
|
||||
if ((this.simpleBroker == null) && (this.stompRelay == null)) {
|
||||
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, null);
|
||||
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -37,6 +37,8 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
|
||||
|
||||
private String applicationPasscode = "guest";
|
||||
|
||||
private boolean autoStartup = true;
|
||||
|
||||
|
||||
public StompBrokerRelayRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) {
|
||||
super(webSocketReplyChannel, destinationPrefixes);
|
||||
@@ -52,13 +54,6 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the STOMP message broker host.
|
||||
*/
|
||||
protected String getRelayHost() {
|
||||
return this.relayHost;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the STOMP message broker port.
|
||||
*/
|
||||
@@ -67,13 +62,6 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the STOMP message broker port.
|
||||
*/
|
||||
protected int getRelayPort() {
|
||||
return this.relayPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the login for a "system" TCP connection used to send messages to the STOMP
|
||||
* broker without having a client session (e.g. REST/HTTP request handling method).
|
||||
@@ -84,13 +72,6 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the login for a shared, "system" connection to the STOMP message broker.
|
||||
*/
|
||||
protected String getApplicationLogin() {
|
||||
return this.applicationLogin;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the passcode for a "system" TCP connection used to send messages to the STOMP
|
||||
* broker without having a client session (e.g. REST/HTTP request handling method).
|
||||
@@ -102,10 +83,14 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the passcode for a shared, "system" connection to the STOMP message broker.
|
||||
* Configure whether the {@link StompBrokerRelayMessageHandler} should start
|
||||
* automatically when the Spring ApplicationContext is refreshed.
|
||||
* <p>
|
||||
* The default setting is {@code true}.
|
||||
*/
|
||||
protected String getApplicationPasscode() {
|
||||
return this.applicationPasscode;
|
||||
public StompBrokerRelayRegistration setAutoStartup(boolean autoStartup) {
|
||||
this.autoStartup = autoStartup;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -116,6 +101,7 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
|
||||
handler.setRelayPort(this.relayPort);
|
||||
handler.setSystemLogin(this.applicationLogin);
|
||||
handler.setSystemPasscode(this.applicationPasscode);
|
||||
handler.setAutoStartup(this.autoStartup);
|
||||
return handler;
|
||||
}
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
|
||||
@Bean
|
||||
public SubProtocolWebSocketHandler subProtocolWebSocketHandler() {
|
||||
SubProtocolWebSocketHandler wsHandler = new SubProtocolWebSocketHandler(webSocketRequestChannel());
|
||||
webSocketReplyChannel().subscribe(wsHandler);
|
||||
webSocketResponseChannel().subscribe(wsHandler);
|
||||
return wsHandler;
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SubscribableChannel webSocketReplyChannel() {
|
||||
public SubscribableChannel webSocketResponseChannel() {
|
||||
return new ExecutorSubscribableChannel(webSocketChannelExecutor());
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
|
||||
@Bean
|
||||
public AnnotationMethodMessageHandler annotationMethodMessageHandler() {
|
||||
AnnotationMethodMessageHandler handler =
|
||||
new AnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketReplyChannel());
|
||||
new AnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketResponseChannel());
|
||||
handler.setDestinationPrefixes(getMessageBrokerConfigurer().getAnnotationMethodDestinationPrefixes());
|
||||
handler.setMessageConverter(brokerMessageConverter());
|
||||
webSocketRequestChannel().subscribe(handler);
|
||||
@@ -140,7 +140,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
|
||||
}
|
||||
else {
|
||||
webSocketRequestChannel().subscribe(handler);
|
||||
brokerMessageChannel().subscribe(handler);
|
||||
brokerChannel().subscribe(handler);
|
||||
return handler;
|
||||
}
|
||||
}
|
||||
@@ -153,14 +153,14 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
|
||||
}
|
||||
else {
|
||||
webSocketRequestChannel().subscribe(handler);
|
||||
brokerMessageChannel().subscribe(handler);
|
||||
brokerChannel().subscribe(handler);
|
||||
return handler;
|
||||
}
|
||||
}
|
||||
|
||||
protected final MessageBrokerConfigurer getMessageBrokerConfigurer() {
|
||||
if (this.messageBrokerConfigurer == null) {
|
||||
MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketReplyChannel());
|
||||
MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketResponseChannel());
|
||||
configureMessageBroker(configurer);
|
||||
this.messageBrokerConfigurer = configurer;
|
||||
}
|
||||
@@ -175,19 +175,19 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
|
||||
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
|
||||
brokerMessagingTemplate(), userQueueSuffixResolver());
|
||||
webSocketRequestChannel().subscribe(handler);
|
||||
brokerMessageChannel().subscribe(handler);
|
||||
brokerChannel().subscribe(handler);
|
||||
return handler;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SimpMessageSendingOperations brokerMessagingTemplate() {
|
||||
SimpMessagingTemplate template = new SimpMessagingTemplate(webSocketRequestChannel());
|
||||
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
|
||||
template.setMessageConverter(brokerMessageConverter());
|
||||
return template;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SubscribableChannel brokerMessageChannel() {
|
||||
public SubscribableChannel brokerChannel() {
|
||||
return new ExecutorSubscribableChannel(); // synchronous
|
||||
}
|
||||
|
||||
|
||||
@@ -47,6 +47,8 @@ public abstract class AbstractBrokerMessageHandler
|
||||
|
||||
private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
|
||||
|
||||
private boolean autoStartup = true;
|
||||
|
||||
private Object lifecycleMonitor = new Object();
|
||||
|
||||
private volatile boolean running = false;
|
||||
@@ -71,9 +73,13 @@ public abstract class AbstractBrokerMessageHandler
|
||||
return this.eventPublisher;
|
||||
}
|
||||
|
||||
public void setAutoStartup(boolean autoStartup) {
|
||||
this.autoStartup = autoStartup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
return this.autoStartup;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -78,7 +78,7 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
|
||||
|
||||
private final SimpMessageSendingOperations brokerTemplate;
|
||||
|
||||
private final SimpMessageSendingOperations webSocketReplyTemplate;
|
||||
private final SimpMessageSendingOperations webSocketResponseTemplate;
|
||||
|
||||
private Collection<String> destinationPrefixes;
|
||||
|
||||
@@ -106,15 +106,15 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
|
||||
|
||||
/**
|
||||
* @param brokerTemplate a messaging template to sending messages to the broker
|
||||
* @param webSocketReplyChannel the channel for messages to WebSocket clients
|
||||
* @param webSocketResponseChannel the channel for messages to WebSocket clients
|
||||
*/
|
||||
public AnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate,
|
||||
MessageChannel webSocketReplyChannel) {
|
||||
MessageChannel webSocketResponseChannel) {
|
||||
|
||||
Assert.notNull(brokerTemplate, "brokerTemplate is required");
|
||||
Assert.notNull(webSocketReplyChannel, "webSocketReplyChannel is required");
|
||||
Assert.notNull(webSocketResponseChannel, "webSocketReplyChannel is required");
|
||||
this.brokerTemplate = brokerTemplate;
|
||||
this.webSocketReplyTemplate = new SimpMessagingTemplate(webSocketReplyChannel);
|
||||
this.webSocketResponseTemplate = new SimpMessagingTemplate(webSocketResponseChannel);
|
||||
}
|
||||
|
||||
|
||||
@@ -129,7 +129,7 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
|
||||
public void setMessageConverter(MessageConverter<?> converter) {
|
||||
this.messageConverter = converter;
|
||||
if (converter != null) {
|
||||
((AbstractMessageSendingTemplate<?>) this.webSocketReplyTemplate).setMessageConverter(converter);
|
||||
((AbstractMessageSendingTemplate<?>) this.webSocketResponseTemplate).setMessageConverter(converter);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
|
||||
|
||||
// Annotation-based return value types
|
||||
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.brokerTemplate));
|
||||
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketReplyTemplate));
|
||||
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketResponseTemplate));
|
||||
|
||||
// custom return value types
|
||||
this.returnValueHandlers.addHandlers(this.customReturnValueHandlers);
|
||||
@@ -221,14 +221,14 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
|
||||
final Class<A> annotationType, MappingInfoCreator<A> mappingInfoCreator,
|
||||
Map<MappingInfo, HandlerMethod> handlerMethods) {
|
||||
|
||||
Set<Method> messageMethods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() {
|
||||
Set<Method> methods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() {
|
||||
@Override
|
||||
public boolean matches(Method method) {
|
||||
return AnnotationUtils.findAnnotation(method, annotationType) != null;
|
||||
}
|
||||
});
|
||||
|
||||
for (Method method : messageMethods) {
|
||||
for (Method method : methods) {
|
||||
A annotation = AnnotationUtils.findAnnotation(method, annotationType);
|
||||
HandlerMethod hm = createHandlerMethod(handler, method);
|
||||
handlerMethods.put(mappingInfoCreator.create(annotation), hm);
|
||||
|
||||
@@ -127,4 +127,10 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName
|
||||
|
||||
protected abstract boolean sendInternal(Message<?> message, long timeout);
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MessageChannel [name=" + this.beanName + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user