Selector header name is exposed for configuration

Issue: SPR-16732
This commit is contained in:
Rossen Stoyanchev
2018-04-16 23:56:30 -04:00
parent 586be50109
commit ff2228fdaf
8 changed files with 150 additions and 34 deletions

View File

@@ -44,6 +44,7 @@ import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.PathMatcher;
import org.springframework.util.StringUtils;
/**
* Implementation of {@link SubscriptionRegistry} that stores subscriptions
@@ -73,6 +74,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
@Nullable
private String selectorHeaderName = "selector";
private volatile boolean selectorHeaderInUse = false;
@@ -114,26 +116,28 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
/**
* Configure the name of a selector header that a subscription message can
* have in order to filter messages based on their headers. The value of the
* header can use Spring EL expressions against message headers.
* <p>For example the following expression expects a header called "foo" to
* have the value "bar":
* Configure the name of a header that a subscription message can have for
* the purpose of filtering messages matched to the subscription. The header
* value is expected to be a Spring EL boolean expression to be applied to
* the headers of messages matched to the subscription.
* <p>For example:
* <pre>
* headers.foo == 'bar'
* </pre>
* <p>By default this is set to "selector".
* <p>By default this is set to "selector". You can set it to a different
* name, or to {@code null} to turn off support for a selector header.
* @param selectorHeaderName the name to use for a selector header
* @since 4.2
*/
public void setSelectorHeaderName(String selectorHeaderName) {
Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
this.selectorHeaderName = selectorHeaderName;
public void setSelectorHeaderName(@Nullable String selectorHeaderName) {
this.selectorHeaderName = StringUtils.hasText(selectorHeaderName) ? selectorHeaderName : null;
}
/**
* Return the name for the selector header.
* Return the name for the selector header name.
* @since 4.2
*/
@Nullable
public String getSelectorHeaderName() {
return this.selectorHeaderName;
}
@@ -143,25 +147,32 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
protected void addSubscriptionInternal(
String sessionId, String subsId, String destination, Message<?> message) {
Expression expression = getSelectorExpression(message.getHeaders());
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
}
@Nullable
private Expression getSelectorExpression(MessageHeaders headers) {
Expression expression = null;
MessageHeaders headers = message.getHeaders();
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
if (selector != null) {
try {
expression = this.expressionParser.parseExpression(selector);
this.selectorHeaderInUse = true;
if (logger.isTraceEnabled()) {
logger.trace("Subscription selector: [" + selector + "]");
if (getSelectorHeaderName() != null) {
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
if (selector != null) {
try {
expression = this.expressionParser.parseExpression(selector);
this.selectorHeaderInUse = true;
if (logger.isTraceEnabled()) {
logger.trace("Subscription selector: [" + selector + "]");
}
}
}
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to parse selector: " + selector, ex);
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to parse selector: " + selector, ex);
}
}
}
}
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
return expression;
}
@Override

View File

@@ -51,9 +51,6 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
private SubscriptionRegistry subscriptionRegistry;
@Nullable
private PathMatcher pathMatcher;
@@ -61,6 +58,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
@Nullable
private Integer cacheLimit;
@Nullable
private String selectorHeaderName = "selector";
@Nullable
private TaskScheduler taskScheduler;
@@ -68,10 +68,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private long[] heartbeatValue;
@Nullable
private ScheduledFuture<?> heartbeatFuture;
private MessageHeaderInitializer headerInitializer;
private SubscriptionRegistry subscriptionRegistry;
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
@Nullable
private MessageHeaderInitializer headerInitializer;
private ScheduledFuture<?> heartbeatFuture;
/**
@@ -102,6 +107,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
this.subscriptionRegistry = subscriptionRegistry;
initPathMatcherToUse();
initCacheLimitToUse();
initSelectorHeaderNameToUse();
}
public SubscriptionRegistry getSubscriptionRegistry() {
@@ -149,6 +155,33 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
}
}
/**
* Configure the name of a header that a subscription message can have for
* the purpose of filtering messages matched to the subscription. The header
* value is expected to be a Spring EL boolean expression to be applied to
* the headers of messages matched to the subscription.
* <p>For example:
* <pre>
* headers.foo == 'bar'
* </pre>
* <p>By default this is set to "selector". You can set it to a different
* name, or to {@code null} to turn off support for a selector header.
* @param selectorHeaderName the name to use for a selector header
* @since 4.3.17
* @see #setSubscriptionRegistry
* @see DefaultSubscriptionRegistry#setSelectorHeaderName(String)
*/
public void setSelectorHeaderName(@Nullable String selectorHeaderName) {
this.selectorHeaderName = selectorHeaderName;
initSelectorHeaderNameToUse();
}
private void initSelectorHeaderNameToUse() {
if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setSelectorHeaderName(this.selectorHeaderName);
}
}
/**
* Configure the {@link org.springframework.scheduling.TaskScheduler} to
* use for providing heartbeat support. Setting this property also sets the

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,6 +36,9 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
@Nullable
private long[] heartbeat;
@Nullable
private String selectorHeaderName = "selector";
public SimpleBrokerRegistration(SubscribableChannel inChannel, MessageChannel outChannel, String[] prefixes) {
super(inChannel, outChannel, prefixes);
@@ -68,6 +71,24 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
return this;
}
/**
* Configure the name of a header that a subscription message can have for
* the purpose of filtering messages matched to the subscription. The header
* value is expected to be a Spring EL boolean expression to be applied to
* the headers of messages matched to the subscription.
* <p>For example:
* <pre>
* headers.foo == 'bar'
* </pre>
* <p>By default this is set to "selector". You can set it to a different
* name, or to {@code null} to turn off support for a selector header.
* @param selectorHeaderName the name to use for a selector header
* @since 4.3.17
*/
public void setSelectorHeaderName(@Nullable String selectorHeaderName) {
this.selectorHeaderName = selectorHeaderName;
}
@Override
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
@@ -79,6 +100,7 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
if (this.heartbeat != null) {
handler.setHeartbeatValue(this.heartbeat);
}
handler.setSelectorHeaderName(this.selectorHeaderName);
return handler;
}