Consistent use of @Nullable across the codebase (even for internals)

Beyond just formally declaring the current behavior, this revision actually enforces non-null behavior in selected signatures now, not tolerating null values anymore when not explicitly documented. It also changes some utility methods with historic null-in/null-out tolerance towards enforced non-null return values, making them a proper citizen in non-null assignments.

Some issues are left as to-do: in particular a thorough revision of spring-test, and a few tests with unclear failures (ignored as "TODO: NULLABLE") to be sorted out in a follow-up commit.

Issue: SPR-15540
This commit is contained in:
Juergen Hoeller
2017-06-07 14:17:48 +02:00
parent ffc3f6d87d
commit f813712f5b
1493 changed files with 10670 additions and 9172 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -117,8 +117,8 @@ public class MessageHeaders implements Map<String, Object>, Serializable {
* @param id the {@link #ID} header value
* @param timestamp the {@link #TIMESTAMP} header value
*/
protected MessageHeaders(@Nullable Map<String, Object> headers, UUID id, Long timestamp) {
this.headers = (headers != null ? new HashMap<>(headers) : new HashMap<String, Object>());
protected MessageHeaders(@Nullable Map<String, Object> headers, @Nullable UUID id, @Nullable Long timestamp) {
this.headers = (headers != null ? new HashMap<>(headers) : new HashMap<>());
if (id == null) {
this.headers.put(ID, getIdGenerator().generateId());

View File

@@ -17,6 +17,7 @@
package org.springframework.messaging;
import org.springframework.core.NestedRuntimeException;
import org.springframework.lang.Nullable;
/**
* The base exception for any failures related to messaging.
@@ -32,7 +33,7 @@ public class MessagingException extends NestedRuntimeException {
public MessagingException(Message<?> message) {
super(null);
super(null, null);
this.failedMessage = message;
}
@@ -41,7 +42,7 @@ public class MessagingException extends NestedRuntimeException {
this.failedMessage = null;
}
public MessagingException(String description, Throwable cause) {
public MessagingException(@Nullable String description, @Nullable Throwable cause) {
super(description, cause);
this.failedMessage = null;
}
@@ -56,7 +57,7 @@ public class MessagingException extends NestedRuntimeException {
this.failedMessage = message;
}
public MessagingException(Message<?> message, String description, Throwable cause) {
public MessagingException(Message<?> message, @Nullable String description, @Nullable Throwable cause) {
super(description, cause);
this.failedMessage = message;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -192,34 +192,37 @@ public abstract class AbstractMessageConverter implements SmartMessageConverter
return null;
}
payload = convertToInternal(payload, headers, conversionHint);
if (payload == null) {
Object payloadToUse = convertToInternal(payload, headers, conversionHint);
if (payloadToUse == null) {
return null;
}
MimeType mimeType = getDefaultContentType(payload);
MimeType mimeType = getDefaultContentType(payloadToUse);
if (headers != null) {
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
accessor.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, mimeType);
return MessageBuilder.createMessage(payload, accessor.getMessageHeaders());
if (mimeType != null) {
accessor.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, mimeType);
}
return MessageBuilder.createMessage(payloadToUse, accessor.getMessageHeaders());
}
}
MessageBuilder<?> builder = MessageBuilder.withPayload(payload);
MessageBuilder<?> builder = MessageBuilder.withPayload(payloadToUse);
if (headers != null) {
builder.copyHeaders(headers);
}
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, mimeType);
if (mimeType != null) {
builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, mimeType);
}
return builder.build();
}
protected boolean canConvertTo(Object payload, MessageHeaders headers) {
Class<?> clazz = (payload != null ? payload.getClass() : null);
return (supports(clazz) && supportsMimeType(headers));
protected boolean canConvertTo(Object payload, @Nullable MessageHeaders headers) {
return (supports(payload.getClass()) && supportsMimeType(headers));
}
protected boolean supportsMimeType(MessageHeaders headers) {
protected boolean supportsMimeType(@Nullable MessageHeaders headers) {
if (getSupportedMimeTypes().isEmpty()) {
return true;
}
@@ -235,8 +238,9 @@ public abstract class AbstractMessageConverter implements SmartMessageConverter
return false;
}
protected MimeType getMimeType(MessageHeaders headers) {
return (this.contentTypeResolver != null ? this.contentTypeResolver.resolve(headers) : null);
@Nullable
protected MimeType getMimeType(@Nullable MessageHeaders headers) {
return (headers != null && this.contentTypeResolver != null ? this.contentTypeResolver.resolve(headers) : null);
}
@@ -258,7 +262,9 @@ public abstract class AbstractMessageConverter implements SmartMessageConverter
* @since 4.2
*/
@Nullable
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
protected Object convertFromInternal(
Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
return null;
}
@@ -273,7 +279,9 @@ public abstract class AbstractMessageConverter implements SmartMessageConverter
* @since 4.2
*/
@Nullable
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
protected Object convertToInternal(
Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
return null;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,13 +42,17 @@ public class ByteArrayMessageConverter extends AbstractMessageConverter {
@Override
@Nullable
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
protected Object convertFromInternal(
Message<?> message, @Nullable Class<?> targetClass, @Nullable Object conversionHint) {
return message.getPayload();
}
@Override
@Nullable
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
protected Object convertToInternal(
Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
return payload;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package org.springframework.messaging.converter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.InvalidMimeTypeException;
import org.springframework.util.MimeType;
/**
@@ -31,16 +32,13 @@ public interface ContentTypeResolver {
/**
* Determine the {@link MimeType} of a message from the given MessageHeaders.
*
* @param headers the headers to use for the resolution
* @return the resolved {@code MimeType} or {@code null} if none found
*
* @return the resolved {@code MimeType}, or {@code null} if none found
* @throws org.springframework.util.InvalidMimeTypeException if the content type
* is a String that cannot be parsed
* @throws java.lang.IllegalArgumentException if there is a content type but
* its type is unknown
* is a String that cannot be parsed
* @throws IllegalArgumentException if there is a content type but its type is unknown
*/
@Nullable
MimeType resolve(MessageHeaders headers);
MimeType resolve(@Nullable MessageHeaders headers) throws InvalidMimeTypeException;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package org.springframework.messaging.converter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.MimeType;
@@ -53,12 +54,15 @@ public class DefaultContentTypeResolver implements ContentTypeResolver {
@Override
public MimeType resolve(MessageHeaders headers) {
public MimeType resolve(@Nullable MessageHeaders headers) {
if (headers == null || headers.get(MessageHeaders.CONTENT_TYPE) == null) {
return this.defaultMimeType;
}
Object value = headers.get(MessageHeaders.CONTENT_TYPE);
if (value instanceof MimeType) {
if (value == null) {
return null;
}
else if (value instanceof MimeType) {
return (MimeType) value;
}
else if (value instanceof String) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -59,10 +59,7 @@ public class GenericMessageConverter extends SimpleMessageConverter {
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
Object payload = message.getPayload();
if (targetClass == null) {
return payload;
}
if (payload != null && this.conversionService.canConvert(payload.getClass(), targetClass)) {
if (this.conversionService.canConvert(payload.getClass(), targetClass)) {
try {
return this.conversionService.convert(payload, targetClass);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -139,7 +139,7 @@ public class MappingJackson2MessageConverter extends AbstractMessageConverter {
}
@Override
protected boolean canConvertFrom(Message<?> message, Class<?> targetClass) {
protected boolean canConvertFrom(Message<?> message, @Nullable Class<?> targetClass) {
if (targetClass == null || !supportsMimeType(message.getHeaders())) {
return false;
}
@@ -156,8 +156,8 @@ public class MappingJackson2MessageConverter extends AbstractMessageConverter {
}
@Override
protected boolean canConvertTo(Object payload, MessageHeaders headers) {
if (payload == null || !supportsMimeType(headers)) {
protected boolean canConvertTo(Object payload, @Nullable MessageHeaders headers) {
if (!supportsMimeType(headers)) {
return false;
}
if (!logger.isWarnEnabled()) {
@@ -179,7 +179,7 @@ public class MappingJackson2MessageConverter extends AbstractMessageConverter {
* (typically a {@link JsonMappingException})
* @since 4.3
*/
protected void logWarningIfNecessary(Type type, Throwable cause) {
protected void logWarningIfNecessary(Type type, @Nullable Throwable cause) {
if (cause != null && !(cause instanceof JsonMappingException && cause.getMessage().startsWith("Can not find"))) {
String msg = "Failed to evaluate Jackson " + (type instanceof JavaType ? "de" : "") +
"serialization for type [" + type + "]";
@@ -269,7 +269,7 @@ public class MappingJackson2MessageConverter extends AbstractMessageConverter {
* @since 4.2
*/
@Nullable
protected Class<?> getSerializationView(Object conversionHint) {
protected Class<?> getSerializationView(@Nullable Object conversionHint) {
if (conversionHint instanceof MethodParameter) {
MethodParameter param = (MethodParameter) conversionHint;
JsonView annotation = (param.getParameterIndex() >= 0 ?
@@ -303,8 +303,8 @@ public class MappingJackson2MessageConverter extends AbstractMessageConverter {
* @param contentType the MIME type from the MessageHeaders, if any
* @return the JSON encoding to use (never {@code null})
*/
protected JsonEncoding getJsonEncoding(MimeType contentType) {
if ((contentType != null) && (contentType.getCharset() != null)) {
protected JsonEncoding getJsonEncoding(@Nullable MimeType contentType) {
if (contentType != null && (contentType.getCharset() != null)) {
Charset charset = contentType.getCharset();
for (JsonEncoding encoding : JsonEncoding.values()) {
if (charset.name().equals(encoding.getJavaName())) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -178,7 +178,7 @@ public class MarshallingMessageConverter extends AbstractMessageConverter {
payload = writer.toString();
}
}
catch (Exception ex) {
catch (Throwable ex) {
throw new MessageConversionException("Could not marshal XML: " + ex.getMessage(), ex);
}
return payload;

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package org.springframework.messaging.converter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
@@ -32,7 +33,7 @@ public class MessageConversionException extends MessagingException {
super(description);
}
public MessageConversionException(String description, Throwable cause) {
public MessageConversionException(@Nullable String description, @Nullable Throwable cause) {
super(description, cause);
}
@@ -40,7 +41,7 @@ public class MessageConversionException extends MessagingException {
super(failedMessage, description);
}
public MessageConversionException(Message<?> failedMessage, String description, Throwable cause) {
public MessageConversionException(Message<?> failedMessage, @Nullable String description, @Nullable Throwable cause) {
super(failedMessage, description, cause);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@@ -38,17 +38,11 @@ public class SimpleMessageConverter implements MessageConverter {
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
Object payload = message.getPayload();
if (targetClass == null) {
return payload;
}
return (ClassUtils.isAssignableValue(targetClass, payload) ? payload : null);
}
@Override
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
if (payload == null) {
return null;
}
if (headers != null) {
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
/**
@@ -42,6 +43,7 @@ public class StringMessageConverter extends AbstractMessageConverter {
public StringMessageConverter(Charset defaultCharset) {
super(new MimeType("text", "plain", defaultCharset));
Assert.notNull(defaultCharset, "Default Charset must not be null");
this.defaultCharset = defaultCharset;
}
@@ -59,7 +61,9 @@ public class StringMessageConverter extends AbstractMessageConverter {
}
@Override
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
protected Object convertToInternal(
Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
if (byte[].class == getSerializedPayloadClass()) {
Charset charset = getContentTypeCharset(getMimeType(headers));
payload = ((String) payload).getBytes(charset);
@@ -67,7 +71,7 @@ public class StringMessageConverter extends AbstractMessageConverter {
return payload;
}
private Charset getContentTypeCharset(MimeType mimeType) {
private Charset getContentTypeCharset(@Nullable MimeType mimeType) {
if (mimeType != null && mimeType.getCharset() != null) {
return mimeType.getCharset();
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -69,6 +69,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
/**
* Return the configured default destination.
*/
@Nullable
public D getDefaultDestination() {
return this.defaultDestination;
}
@@ -120,7 +121,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
}
@Override
public void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException {
public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers) throws MessagingException {
convertAndSend(destination, payload, headers, null);
}
@@ -130,7 +131,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
}
@Override
public void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor)
public void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor)
throws MessagingException {
convertAndSend(destination, payload, null, postProcessor);
@@ -153,7 +154,9 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
* @param postProcessor the post processor to apply to the message
* @return the converted message
*/
protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
protected Message<?> doConvert(Object payload, @Nullable Map<String, Object> headers,
@Nullable MessagePostProcessor postProcessor) {
MessageHeaders messageHeaders = null;
Object conversionHint = (headers != null ? headers.get(CONVERSION_HINT_HEADER) : null);
@@ -172,7 +175,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
((SmartMessageConverter) converter).toMessage(payload, messageHeaders, conversionHint) :
converter.toMessage(payload, messageHeaders));
if (message == null) {
String payloadType = (payload != null ? payload.getClass().getName() : null);
String payloadType = payload.getClass().getName();
Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
throw new MessageConversionException("Unable to convert payload with type='" + payloadType +
"', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageReceiv
return doSendAndReceive(destination, requestMessage);
}
@Nullable
protected abstract Message<?> doSendAndReceive(D destination, Message<?> requestMessage);
@@ -67,7 +68,9 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageReceiv
}
@Override
public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass, MessagePostProcessor postProcessor) {
public <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass,
@Nullable MessagePostProcessor postProcessor) {
return convertSendAndReceive(destination, request, null, targetClass, postProcessor);
}

View File

@@ -16,6 +16,7 @@
package org.springframework.messaging.core;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessagingException;
/**
@@ -32,7 +33,7 @@ public class DestinationResolutionException extends MessagingException {
super(description);
}
public DestinationResolutionException(String description, Throwable cause) {
public DestinationResolutionException(@Nullable String description, @Nullable Throwable cause) {
super(description, cause);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package org.springframework.messaging.core;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
@@ -34,14 +35,16 @@ public interface DestinationResolvingMessageReceivingOperations<D> extends Messa
* Resolve the given destination name and receive a message from it.
* @param destinationName the destination name to resolve
*/
@Nullable
Message<?> receive(String destinationName) throws MessagingException;
/**
* Resolve the given destination name, receive a message from it, convert the
* payload to the specified target type.
* Resolve the given destination name, receive a message from it,
* convert the payload to the specified target type.
* @param destinationName the destination name to resolve
* @param targetClass the target class for the converted payload
*/
@Nullable
<T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException;
}

View File

@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
@@ -192,6 +193,7 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
return doReceive(channel, this.receiveTimeout);
}
@Nullable
protected final Message<?> doReceive(MessageChannel channel, long timeout) {
Assert.notNull(channel, "MessageChannel is required");
Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages");
@@ -242,19 +244,20 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
private long sendTimeout(Message<?> requestMessage) {
Long sendTimeout = headerToLong(requestMessage.getHeaders().get(this.sendTimeoutHeader));
return sendTimeout == null ? this.sendTimeout : sendTimeout;
return (sendTimeout != null ? sendTimeout : this.sendTimeout);
}
private long receiveTimeout(Message<?> requestMessage) {
Long receiveTimeout = headerToLong(requestMessage.getHeaders().get(this.receiveTimeoutHeader));
return receiveTimeout == null ? this.receiveTimeout : receiveTimeout;
return (receiveTimeout != null ? receiveTimeout : this.receiveTimeout);
}
private Long headerToLong(Object headerValue) {
@Nullable
private Long headerToLong(@Nullable Object headerValue) {
if (headerValue instanceof Number) {
return ((Number) headerValue).longValue();
}
else if(headerValue instanceof String) {
else if (headerValue instanceof String) {
return Long.parseLong((String) headerValue);
}
else {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@ package org.springframework.messaging.handler;
import java.util.Collection;
import java.util.Iterator;
import org.springframework.lang.Nullable;
/**
* A base class for {@link MessageCondition} types providing implementations of
* {@link #equals(Object)}, {@link #hashCode()}, and {@link #toString()}.
@@ -29,7 +31,7 @@ import java.util.Iterator;
public abstract class AbstractMessageCondition<T extends AbstractMessageCondition<T>> implements MessageCondition<T> {
@Override
public boolean equals(Object obj) {
public boolean equals(@Nullable Object obj) {
if (this == obj) {
return true;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.PathMatcher;
@@ -62,24 +63,17 @@ public class DestinationPatternsMessageCondition extends AbstractMessageConditio
* @param patterns the URL patterns to use; if 0, the condition will match to every request.
* @param pathMatcher the PathMatcher to use
*/
public DestinationPatternsMessageCondition(String[] patterns, PathMatcher pathMatcher) {
this(asList(patterns), pathMatcher);
public DestinationPatternsMessageCondition(String[] patterns, @Nullable PathMatcher pathMatcher) {
this(Arrays.asList(patterns), pathMatcher);
}
private DestinationPatternsMessageCondition(Collection<String> patterns, PathMatcher pathMatcher) {
private DestinationPatternsMessageCondition(Collection<String> patterns, @Nullable PathMatcher pathMatcher) {
this.pathMatcher = (pathMatcher != null ? pathMatcher : new AntPathMatcher());
this.patterns = Collections.unmodifiableSet(prependLeadingSlash(patterns, this.pathMatcher));
}
private static List<String> asList(String... patterns) {
return (patterns != null ? Arrays.asList(patterns) : Collections.emptyList());
}
private static Set<String> prependLeadingSlash(Collection<String> patterns, PathMatcher pathMatcher) {
if (patterns == null) {
return Collections.emptySet();
}
boolean slashSeparator = pathMatcher.combine("a", "a").equals("a/a");
Set<String> result = new LinkedHashSet<>(patterns.size());
for (String pattern : patterns) {
@@ -190,9 +184,12 @@ public class DestinationPatternsMessageCondition extends AbstractMessageConditio
@Override
public int compareTo(DestinationPatternsMessageCondition other, Message<?> message) {
String destination = (String) message.getHeaders().get(LOOKUP_DESTINATION_HEADER);
if (destination == null) {
return 0;
}
Comparator<String> patternComparator = this.pathMatcher.getPatternComparator(destination);
Iterator<String> iterator = patterns.iterator();
Iterator<String> iterator = this.patterns.iterator();
Iterator<String> iteratorOther = other.patterns.iterator();
while (iterator.hasNext() && iteratorOther.hasNext()) {
int result = patternComparator.compare(iterator.next(), iteratorOther.next());

View File

@@ -28,6 +28,7 @@ import org.springframework.core.GenericTypeResolver;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.SynthesizingMethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
@@ -107,7 +108,8 @@ public class HandlerMethod {
Assert.notNull(method, "Method is required");
this.bean = beanName;
this.beanFactory = beanFactory;
this.beanType = ClassUtils.getUserClass(beanFactory.getType(beanName));
Class<?> beanType = beanFactory.getType(beanName);
this.beanType = (beanType != null ? ClassUtils.getUserClass(beanType) : null);
this.method = method;
this.bridgedMethod = BridgeMethodResolver.findBridgedMethod(method);
this.parameters = initMethodParameters();
@@ -222,6 +224,7 @@ public class HandlerMethod {
* @return the annotation, or {@code null} if none found
* @see AnnotatedElementUtils#findMergedAnnotation
*/
@Nullable
public <A extends Annotation> A getMethodAnnotation(Class<A> annotationType) {
return AnnotatedElementUtils.findMergedAnnotation(this.method, annotationType);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.messaging.handler;
import org.springframework.core.Ordered;
import org.springframework.lang.Nullable;
/**
* Represents a Spring-managed bean with cross-cutting functionality to be
@@ -39,6 +40,7 @@ public interface MessagingAdviceBean extends Ordered {
* <p>If the bean type is a CGLIB-generated class, the original user-defined
* class is returned.
*/
@Nullable
Class<?> getBeanType();
/**

View File

@@ -49,12 +49,8 @@ public class AnnotationExceptionHandlerMethodResolver extends AbstractExceptionH
private static Map<Class<? extends Throwable>, Method> initExceptionMappings(Class<?> handlerType) {
Map<Method, MessageExceptionHandler> methods = MethodIntrospector.selectMethods(handlerType,
new MethodIntrospector.MetadataLookup<MessageExceptionHandler>() {
@Override
public MessageExceptionHandler inspect(Method method) {
return AnnotationUtils.findAnnotation(method, MessageExceptionHandler.class);
}
});
(MethodIntrospector.MetadataLookup<MessageExceptionHandler>) method ->
AnnotationUtils.findAnnotation(method, MessageExceptionHandler.class));
Map<Class<? extends Throwable>, Method> result = new HashMap<>();
for (Map.Entry<Method, MessageExceptionHandler> entry : methods.entrySet()) {

View File

@@ -80,7 +80,7 @@ public class MessageMethodArgumentResolver implements HandlerMethodArgumentResol
}
Object payload = message.getPayload();
if (payload == null || targetPayloadType.isInstance(payload)) {
if (targetPayloadType.isInstance(payload)) {
return message;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.messaging.handler.invocation;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
/**
@@ -36,7 +37,7 @@ public abstract class AbstractAsyncReturnValueHandler implements AsyncHandlerMet
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) {
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, Message<?> message) {
// Should never be called since we return "true" from isAsyncReturnValue
throw new IllegalStateException("Unexpected invocation");
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -86,11 +86,9 @@ public abstract class AbstractMethodMessageHandler<T>
private Collection<String> destinationPrefixes = new ArrayList<>();
private final List<HandlerMethodArgumentResolver> customArgumentResolvers =
new ArrayList<>(4);
private final List<HandlerMethodArgumentResolver> customArgumentResolvers = new ArrayList<>(4);
private final List<HandlerMethodReturnValueHandler> customReturnValueHandlers =
new ArrayList<>(4);
private final List<HandlerMethodReturnValueHandler> customReturnValueHandlers = new ArrayList<>(4);
private final HandlerMethodArgumentResolverComposite argumentResolvers =
new HandlerMethodArgumentResolverComposite();
@@ -119,7 +117,7 @@ public abstract class AbstractMethodMessageHandler<T>
* <p>By default, no prefixes are configured in which case all messages are
* eligible for handling.
*/
public void setDestinationPrefixes(Collection<String> prefixes) {
public void setDestinationPrefixes(@Nullable Collection<String> prefixes) {
this.destinationPrefixes.clear();
if (prefixes != null) {
for (String prefix : prefixes) {
@@ -140,7 +138,7 @@ public abstract class AbstractMethodMessageHandler<T>
* Sets the list of custom {@code HandlerMethodArgumentResolver}s that will be used
* after resolvers for supported argument type.
*/
public void setCustomArgumentResolvers(List<HandlerMethodArgumentResolver> customArgumentResolvers) {
public void setCustomArgumentResolvers(@Nullable List<HandlerMethodArgumentResolver> customArgumentResolvers) {
this.customArgumentResolvers.clear();
if (customArgumentResolvers != null) {
this.customArgumentResolvers.addAll(customArgumentResolvers);
@@ -158,7 +156,7 @@ public abstract class AbstractMethodMessageHandler<T>
* Set the list of custom {@code HandlerMethodReturnValueHandler}s that will be used
* after return value handlers for known types.
*/
public void setCustomReturnValueHandlers(List<HandlerMethodReturnValueHandler> customReturnValueHandlers) {
public void setCustomReturnValueHandlers(@Nullable List<HandlerMethodReturnValueHandler> customReturnValueHandlers) {
this.customReturnValueHandlers.clear();
if (customReturnValueHandlers != null) {
this.customReturnValueHandlers.addAll(customReturnValueHandlers);
@@ -177,7 +175,7 @@ public abstract class AbstractMethodMessageHandler<T>
* the ones configured by default. This is an advanced option; for most use cases
* it should be sufficient to use {@link #setCustomArgumentResolvers}.
*/
public void setArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
public void setArgumentResolvers(@Nullable List<HandlerMethodArgumentResolver> argumentResolvers) {
if (argumentResolvers == null) {
this.argumentResolvers.clear();
return;
@@ -197,7 +195,7 @@ public abstract class AbstractMethodMessageHandler<T>
* the ones configured by default. This is an advanced option; for most use cases
* it should be sufficient to use {@link #setCustomReturnValueHandlers}.
*/
public void setReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
public void setReturnValueHandlers(@Nullable List<HandlerMethodReturnValueHandler> returnValueHandlers) {
if (returnValueHandlers == null) {
this.returnValueHandlers.clear();
return;
@@ -217,6 +215,7 @@ public abstract class AbstractMethodMessageHandler<T>
this.applicationContext = applicationContext;
}
@Nullable
public ApplicationContext getApplicationContext() {
return this.applicationContext;
}
@@ -236,7 +235,7 @@ public abstract class AbstractMethodMessageHandler<T>
if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
Class<?> beanType = null;
try {
beanType = getApplicationContext().getType(beanName);
beanType = this.applicationContext.getType(beanName);
}
catch (Throwable ex) {
// An unresolvable bean type, probably from a lazy bean - let's ignore it.
@@ -281,21 +280,17 @@ public abstract class AbstractMethodMessageHandler<T>
protected final void detectHandlerMethods(final Object handler) {
Class<?> handlerType = (handler instanceof String ?
this.applicationContext.getType((String) handler) : handler.getClass());
final Class<?> userType = ClassUtils.getUserClass(handlerType);
Map<Method, T> methods = MethodIntrospector.selectMethods(userType,
new MethodIntrospector.MetadataLookup<T>() {
@Override
public T inspect(Method method) {
return getMappingForMethod(method, userType);
}
});
if (logger.isDebugEnabled()) {
logger.debug(methods.size() + " message handler methods found on " + userType + ": " + methods);
}
for (Map.Entry<Method, T> entry : methods.entrySet()) {
registerHandlerMethod(handler, entry.getKey(), entry.getValue());
if (handlerType != null) {
final Class<?> userType = ClassUtils.getUserClass(handlerType);
Map<Method, T> methods = MethodIntrospector.selectMethods(userType,
(MethodIntrospector.MetadataLookup<T>) method -> getMappingForMethod(method, userType));
if (logger.isDebugEnabled()) {
logger.debug(methods.size() + " message handler methods found on " + userType + ": " + methods);
}
for (Map.Entry<Method, T> entry : methods.entrySet()) {
registerHandlerMethod(handler, entry.getKey(), entry.getValue());
}
}
}
@@ -402,6 +397,7 @@ public abstract class AbstractMethodMessageHandler<T>
headerAccessor.setImmutable();
}
@Nullable
protected abstract String getDestination(Message<?> message);
/**
@@ -412,7 +408,7 @@ public abstract class AbstractMethodMessageHandler<T>
* <p>If there are no destination prefixes, return the destination as is.
*/
@Nullable
protected String getLookupDestination(String destination) {
protected String getLookupDestination(@Nullable String destination) {
if (destination == null) {
return null;
}
@@ -508,7 +504,7 @@ public abstract class AbstractMethodMessageHandler<T>
if (void.class == returnType.getParameterType()) {
return;
}
if (this.returnValueHandlers.isAsyncReturnValue(returnValue, returnType)) {
if (returnValue != null && this.returnValueHandlers.isAsyncReturnValue(returnValue, returnType)) {
ListenableFuture<?> future = this.returnValueHandlers.toListenableFuture(returnValue, returnType);
if (future != null) {
future.addCallback(new ReturnValueListenableFutureCallback(invocable, message));

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
@@ -55,7 +56,7 @@ public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgu
* Add the given {@link HandlerMethodArgumentResolver}s.
* @since 4.3
*/
public HandlerMethodArgumentResolverComposite addResolvers(HandlerMethodArgumentResolver... resolvers) {
public HandlerMethodArgumentResolverComposite addResolvers(@Nullable HandlerMethodArgumentResolver... resolvers) {
if (resolvers != null) {
for (HandlerMethodArgumentResolver resolver : resolvers) {
this.argumentResolvers.add(resolver);
@@ -67,7 +68,7 @@ public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgu
/**
* Add the given {@link HandlerMethodArgumentResolver}s.
*/
public HandlerMethodArgumentResolverComposite addResolvers(List<? extends HandlerMethodArgumentResolver> argumentResolvers) {
public HandlerMethodArgumentResolverComposite addResolvers(@Nullable List<? extends HandlerMethodArgumentResolver> argumentResolvers) {
if (argumentResolvers != null) {
for (HandlerMethodArgumentResolver resolver : argumentResolvers) {
this.argumentResolvers.add(resolver);
@@ -115,6 +116,7 @@ public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgu
/**
* Find a registered {@link HandlerMethodArgumentResolver} that supports the given method parameter.
*/
@Nullable
private HandlerMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
HandlerMethodArgumentResolver result = this.argumentResolverCache.get(parameter);
if (result == null) {

View File

@@ -17,6 +17,7 @@
package org.springframework.messaging.handler.invocation;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
/**
@@ -40,13 +41,13 @@ public interface HandlerMethodReturnValueHandler {
/**
* Handle the given return value.
* @param returnValue the value returned from the handler method
* @param returnType the type of the return value. This type must have
* previously been passed to
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
* and it must have returned {@code true}
* @param returnType the type of the return value. This type must have previously
* been passed to {@link #supportsReturnType(org.springframework.core.MethodParameter)}
* and it must have returned {@code true}.
* @param message the message that caused this method to be called
* @throws Exception if the return value handling results in an error
*/
void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception;
void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception;
}

View File

@@ -59,15 +59,17 @@ public class HandlerMethodReturnValueHandlerComposite implements AsyncHandlerMet
/**
* Add the given {@link HandlerMethodReturnValueHandler}.
*/
public HandlerMethodReturnValueHandlerComposite addHandler(HandlerMethodReturnValueHandler returnValuehandler) {
this.returnValueHandlers.add(returnValuehandler);
public HandlerMethodReturnValueHandlerComposite addHandler(HandlerMethodReturnValueHandler returnValueHandler) {
this.returnValueHandlers.add(returnValueHandler);
return this;
}
/**
* Add the given {@link HandlerMethodReturnValueHandler}s.
*/
public HandlerMethodReturnValueHandlerComposite addHandlers(List<? extends HandlerMethodReturnValueHandler> handlers) {
public HandlerMethodReturnValueHandlerComposite addHandlers(
@Nullable List<? extends HandlerMethodReturnValueHandler> handlers) {
if (handlers != null) {
for (HandlerMethodReturnValueHandler handler : handlers) {
this.returnValueHandlers.add(handler);
@@ -92,7 +94,7 @@ public class HandlerMethodReturnValueHandlerComposite implements AsyncHandlerMet
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);

View File

@@ -28,6 +28,7 @@ import org.springframework.core.ResolvableType;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
@@ -104,6 +105,7 @@ public class InvocableHandlerMethod extends HandlerMethod {
* @exception Exception raised if no suitable argument resolver can be found,
* or if the method raised an exception
*/
@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(message, providedArgs);
if (logger.isTraceEnabled()) {
@@ -161,9 +163,6 @@ public class InvocableHandlerMethod extends HandlerMethod {
*/
@Nullable
private Object resolveProvidedArgument(MethodParameter parameter, Object... providedArgs) {
if (providedArgs == null) {
return null;
}
for (Object providedArg : providedArgs) {
if (parameter.getParameterType().isInstance(providedArg)) {
return providedArg;
@@ -176,6 +175,7 @@ public class InvocableHandlerMethod extends HandlerMethod {
/**
* Invoke the handler method with the given argument values.
*/
@Nullable
protected Object doInvoke(Object... args) throws Exception {
ReflectionUtils.makeAccessible(getBridgedMethod());
try {
@@ -282,14 +282,16 @@ public class InvocableHandlerMethod extends HandlerMethod {
return this.returnValue.getClass();
}
if (!ResolvableType.NONE.equals(this.returnType)) {
return this.returnType.resolve();
return this.returnType.resolve(Object.class);
}
return super.getParameterType();
}
@Override
public Type getGenericParameterType() {
return this.returnType.getType();
Type returnType = this.returnType.getType();
Assert.state(returnType != null, "No return type");
return returnType;
}
@Override

View File

@@ -62,7 +62,7 @@ public class MethodArgumentResolutionException extends MessagingException {
private static String getMethodParameterMessage(MethodParameter parameter) {
return "Could not resolve method parameter at index " + parameter.getParameterIndex() +
" in " + parameter.getMethod().toGenericString();
" in " + parameter.getExecutable().toGenericString();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@ import org.springframework.core.NamedThreadLocal;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
/**
* Holder class to expose SiMP attributes associated with a session (e.g. WebSocket)
* in the form of a thread-bound {@link SimpAttributes} object.
@@ -45,7 +44,7 @@ public abstract class SimpAttributesContextHolder {
* Bind the given SimpAttributes to the current thread,
* @param attributes the RequestAttributes to expose
*/
public static void setAttributes(SimpAttributes attributes) {
public static void setAttributes(@Nullable SimpAttributes attributes) {
if (attributes != null) {
attributesHolder.set(attributes);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -116,6 +116,7 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
}
}
@Nullable
public SimpMessageType getMessageType() {
return (SimpMessageType) getHeader(MESSAGE_TYPE_HEADER);
}
@@ -125,6 +126,7 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
setHeader(DESTINATION_HEADER, destination);
}
@Nullable
public String getDestination() {
return (String) getHeader(DESTINATION_HEADER);
}
@@ -133,6 +135,7 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
setHeader(SUBSCRIPTION_ID_HEADER, subscriptionId);
}
@Nullable
public String getSubscriptionId() {
return (String) getHeader(SUBSCRIPTION_ID_HEADER);
}
@@ -142,8 +145,9 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
}
/**
* @return the id of the current session
* Return the id of the current session.
*/
@Nullable
public String getSessionId() {
return (String) getHeader(SESSION_ID_HEADER);
}
@@ -159,6 +163,7 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
* Return the attributes associated with the current session.
*/
@SuppressWarnings("unchecked")
@Nullable
public Map<String, Object> getSessionAttributes() {
return (Map<String, Object>) getHeader(SESSION_ATTRIBUTES);
}
@@ -170,6 +175,7 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
/**
* Return the user associated with the current session.
*/
@Nullable
public Principal getUser() {
return (Principal) getHeader(USER_HEADER);
}
@@ -206,7 +212,8 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
private StringBuilder getBaseLogMessage() {
StringBuilder sb = new StringBuilder();
sb.append(getMessageType().name());
SimpMessageType messageType = getMessageType();
sb.append(messageType != null ? messageType.name() : SimpMessageType.OTHER);
if (getDestination() != null) {
sb.append(" destination=").append(getDestination());
}
@@ -246,31 +253,38 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
return new SimpMessageHeaderAccessor(message);
}
@Nullable
public static SimpMessageType getMessageType(Map<String, Object> headers) {
return (SimpMessageType) headers.get(MESSAGE_TYPE_HEADER);
}
@Nullable
public static String getDestination(Map<String, Object> headers) {
return (String) headers.get(DESTINATION_HEADER);
}
@Nullable
public static String getSubscriptionId(Map<String, Object> headers) {
return (String) headers.get(SUBSCRIPTION_ID_HEADER);
}
@Nullable
public static String getSessionId(Map<String, Object> headers) {
return (String) headers.get(SESSION_ID_HEADER);
}
@SuppressWarnings("unchecked")
@Nullable
public static Map<String, Object> getSessionAttributes(Map<String, Object> headers) {
return (Map<String, Object>) headers.get(SESSION_ATTRIBUTES);
}
@Nullable
public static Principal getUser(Map<String, Object> headers) {
return (Principal) headers.get(USER_HEADER);
}
@Nullable
public static long[] getHeartbeat(Map<String, Object> headers) {
return (long[]) headers.get(HEART_BEAT_HEADER);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package org.springframework.messaging.simp;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.MessageCondition;
@@ -91,7 +92,7 @@ public class SimpMessageMappingInfo implements MessageCondition<SimpMessageMappi
@Override
public boolean equals(Object obj) {
public boolean equals(@Nullable Object obj) {
if (this == obj) {
return true;
}
@@ -110,11 +111,7 @@ public class SimpMessageMappingInfo implements MessageCondition<SimpMessageMappi
@Override
public String toString() {
StringBuilder builder = new StringBuilder("{");
builder.append(this.destinationConditions);
builder.append(",messageType=").append(this.messageTypeMessageCondition);
builder.append('}');
return builder.toString();
return "{" + this.destinationConditions + ",messageType=" + this.messageTypeMessageCondition + '}';
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -84,7 +84,7 @@ public interface SimpMessageSendingOperations extends MessageSendingOperations<S
* @param payload the payload to send (may be {@code null})
* @param headers the message headers (may be {@code null})
*/
void convertAndSendToUser(String user, String destination, @Nullable Object payload, @Nullable Map<String, Object> headers)
void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers)
throws MessagingException;
/**
@@ -94,8 +94,8 @@ public interface SimpMessageSendingOperations extends MessageSendingOperations<S
* @param payload the payload to send (may be {@code null})
* @param postProcessor a postProcessor to post-process or modify the created message
*/
void convertAndSendToUser(String user, String destination, @Nullable Object payload,
MessagePostProcessor postProcessor) throws MessagingException;
void convertAndSendToUser(String user, String destination, Object payload, MessagePostProcessor postProcessor)
throws MessagingException;
/**
* Send a message to the given user.

View File

@@ -118,6 +118,7 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
/**
* Return the configured header initializer.
*/
@Nullable
public MessageHeaderInitializer getHeaderInitializer() {
return this.headerInitializer;
}
@@ -203,22 +204,23 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
}
@Override
public void convertAndSendToUser(String user, String destination, @Nullable Object payload,
public void convertAndSendToUser(String user, String destination, Object payload,
@Nullable Map<String, Object> headers) throws MessagingException {
convertAndSendToUser(user, destination, payload, headers, null);
}
@Override
public void convertAndSendToUser(String user, String destination, @Nullable Object payload,
MessagePostProcessor postProcessor) throws MessagingException {
public void convertAndSendToUser(String user, String destination, Object payload,
@Nullable MessagePostProcessor postProcessor) throws MessagingException {
convertAndSendToUser(user, destination, payload, null, postProcessor);
}
@Override
public void convertAndSendToUser(String user, String destination, Object payload, @Nullable Map<String, Object> headers,
@Nullable MessagePostProcessor postProcessor) throws MessagingException {
public void convertAndSendToUser(String user, String destination, Object payload,
@Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor)
throws MessagingException {
Assert.notNull(user, "User must not be null");
user = StringUtils.replace(user, "/", "%2F");

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package org.springframework.messaging.simp;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.config.Scope;
import org.springframework.util.Assert;
/**
* A {@link Scope} implementation exposing the attributes of a SiMP session
@@ -34,17 +35,18 @@ public class SimpSessionScope implements Scope {
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
SimpAttributes simpAttributes = SimpAttributesContextHolder.currentAttributes();
Object value = simpAttributes.getAttribute(name);
if (value != null) {
return value;
Object scopedObject = simpAttributes.getAttribute(name);
if (scopedObject != null) {
return scopedObject;
}
synchronized (simpAttributes.getSessionMutex()) {
value = simpAttributes.getAttribute(name);
if (value == null) {
value = objectFactory.getObject();
simpAttributes.setAttribute(name, value);
scopedObject = simpAttributes.getAttribute(name);
if (scopedObject == null) {
scopedObject = objectFactory.getObject();
Assert.state(scopedObject != null, "Scoped object resolved to null");
simpAttributes.setAttribute(name, scopedObject);
}
return value;
return scopedObject;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -125,8 +125,9 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
}
/**
* @return the configured header initializer.
* Return the configured header initializer.
*/
@Nullable
public MessageHeaderInitializer getHeaderInitializer() {
return this.headerInitializer;
}
@@ -142,7 +143,9 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception {
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
if (returnValue == null) {
return;
}
@@ -152,7 +155,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
PlaceholderResolver varResolver = initVarResolver(headers);
Object annotation = findAnnotation(returnType);
if (annotation != null && annotation instanceof SendToUser) {
if (annotation instanceof SendToUser) {
SendToUser sendToUser = (SendToUser) annotation;
boolean broadcast = sendToUser.broadcast();
String user = getUserName(message, headers);
@@ -176,7 +179,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
}
}
}
else {
else if (annotation instanceof SendTo) {
SendTo sendTo = (SendTo) annotation;
String[] destinations = getTargetDestinations(sendTo, message, this.defaultDestinationPrefix);
for (String destination : destinations) {
@@ -189,8 +192,8 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
@Nullable
private Object findAnnotation(MethodParameter returnType) {
Annotation[] anns = new Annotation[4];
anns[0] = AnnotatedElementUtils.findMergedAnnotation(returnType.getMethod(), SendToUser.class);
anns[1] = AnnotatedElementUtils.findMergedAnnotation(returnType.getMethod(), SendTo.class);
anns[0] = AnnotatedElementUtils.findMergedAnnotation(returnType.getExecutable(), SendToUser.class);
anns[1] = AnnotatedElementUtils.findMergedAnnotation(returnType.getExecutable(), SendTo.class);
anns[2] = AnnotatedElementUtils.findMergedAnnotation(returnType.getDeclaringClass(), SendToUser.class);
anns[3] = AnnotatedElementUtils.findMergedAnnotation(returnType.getDeclaringClass(), SendTo.class);
@@ -233,13 +236,14 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
return null;
}
protected String[] getTargetDestinations(Annotation annotation, Message<?> message, String defaultPrefix) {
protected String[] getTargetDestinations(@Nullable Annotation annotation, Message<?> message, String defaultPrefix) {
if (annotation != null) {
String[] value = (String[]) AnnotationUtils.getValue(annotation);
if (!ObjectUtils.isEmpty(value)) {
return value;
}
}
String name = DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER;
String destination = (String) message.getHeaders().get(name);
if (!StringUtils.hasText(destination)) {
@@ -250,7 +254,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
new String[] {defaultPrefix + destination} : new String[] {defaultPrefix + '/' + destination});
}
private MessageHeaders createHeaders(String sessionId, MethodParameter returnType) {
private MessageHeaders createHeaders(@Nullable String sessionId, MethodParameter returnType) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
@@ -274,7 +278,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
private final Map<String, String> vars;
public DestinationVariablePlaceholderResolver(Map<String, String> vars) {
public DestinationVariablePlaceholderResolver(@Nullable Map<String, String> vars) {
this.vars = vars;
}

View File

@@ -32,6 +32,7 @@ import org.springframework.context.SmartLifecycle;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.convert.ConversionService;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
@@ -146,11 +147,12 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
* depending on the configured {@code PathMatcher}.
*/
@Override
public void setDestinationPrefixes(Collection<String> prefixes) {
public void setDestinationPrefixes(@Nullable Collection<String> prefixes) {
super.setDestinationPrefixes(appendSlashes(prefixes));
}
private static Collection<String> appendSlashes(Collection<String> prefixes) {
@Nullable
private static Collection<String> appendSlashes(@Nullable Collection<String> prefixes) {
if (CollectionUtils.isEmpty(prefixes)) {
return prefixes;
}
@@ -172,9 +174,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
*/
public void setMessageConverter(MessageConverter converter) {
this.messageConverter = converter;
if (converter != null) {
((AbstractMessageSendingTemplate<?>) this.clientMessagingTemplate).setMessageConverter(converter);
}
((AbstractMessageSendingTemplate<?>) this.clientMessagingTemplate).setMessageConverter(converter);
}
/**
@@ -434,7 +434,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
}
@Override
protected String getLookupDestination(String destination) {
protected String getLookupDestination(@Nullable String destination) {
if (destination == null) {
return null;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessageSendingOperations;
@@ -88,6 +89,7 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
/**
* Return the configured header initializer.
*/
@Nullable
public MessageHeaderInitializer getHeaderInitializer() {
return this.headerInitializer;
}
@@ -101,7 +103,7 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
if (returnValue == null) {
@@ -109,12 +111,16 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
}
MessageHeaders headers = message.getHeaders();
String destination = SimpMessageHeaderAccessor.getDestination(headers);
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
String destination = SimpMessageHeaderAccessor.getDestination(headers);
if (subscriptionId == null) {
throw new IllegalStateException("No subscriptionId in " + message +
throw new IllegalStateException("No simpSubscriptionId in " + message +
" returned by: " + returnType.getMethod());
}
if (destination == null) {
throw new IllegalStateException("No simpDestination in " + message +
" returned by: " + returnType.getMethod());
}
@@ -125,12 +131,14 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
this.messagingTemplate.convertAndSend(destination, returnValue, headersToSend);
}
private MessageHeaders createHeaders(String sessionId, String subscriptionId, MethodParameter returnType) {
private MessageHeaders createHeaders(@Nullable String sessionId, String subscriptionId, MethodParameter returnType) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(accessor);
}
accessor.setSessionId(sessionId);
if (sessionId != null) {
accessor.setSessionId(sessionId);
}
accessor.setSubscriptionId(subscriptionId);
accessor.setHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER, returnType);
accessor.setLeaveMutable(true);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@@ -95,7 +96,7 @@ public abstract class AbstractBrokerMessageHandler
* @param destinationPrefixes prefixes to use to filter out messages
*/
public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
SubscribableChannel brokerChannel, @Nullable Collection<String> destinationPrefixes) {
Assert.notNull(inboundChannel, "'inboundChannel' must not be null");
Assert.notNull(outboundChannel, "'outboundChannel' must not be null");
@@ -105,7 +106,7 @@ public abstract class AbstractBrokerMessageHandler
this.clientOutboundChannel = outboundChannel;
this.brokerChannel = brokerChannel;
destinationPrefixes = (destinationPrefixes != null) ? destinationPrefixes : Collections.emptyList();
destinationPrefixes = (destinationPrefixes != null ? destinationPrefixes : Collections.emptyList());
this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes);
}
@@ -241,8 +242,8 @@ public abstract class AbstractBrokerMessageHandler
protected abstract void handleMessageInternal(Message<?> message);
protected boolean checkDestinationPrefix(String destination) {
if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) {
protected boolean checkDestinationPrefix(@Nullable String destination) {
if (destination == null || CollectionUtils.isEmpty(this.destinationPrefixes)) {
return true;
}
for (String prefix : this.destinationPrefixes) {

View File

@@ -214,7 +214,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
context.getPropertyAccessors().add(new SimpMessageHeaderPropertyAccessor());
}
try {
if (expression.getValue(context, boolean.class)) {
if (Boolean.TRUE.equals(expression.getValue(context, Boolean.class))) {
result.add(sessionId, subId);
}
}
@@ -368,6 +368,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
// sessionId -> SessionSubscriptionInfo
private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = new ConcurrentHashMap<>();
@Nullable
public SessionSubscriptionInfo getSubscriptions(String sessionId) {
return this.sessions.get(sessionId);
}
@@ -377,7 +378,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId,
String destination, Expression selectorExpression) {
String destination, @Nullable Expression selectorExpression) {
SessionSubscriptionInfo info = this.sessions.get(sessionId);
if (info == null) {
@@ -391,6 +392,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
return info;
}
@Nullable
public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
return this.sessions.remove(sessionId);
}
@@ -444,7 +446,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
return null;
}
public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) {
public void addSubscription(String destination, String subscriptionId, @Nullable Expression selectorExpression) {
Set<Subscription> subs = this.destinationLookup.get(destination);
if (subs == null) {
synchronized (this.destinationLookup) {
@@ -460,7 +462,8 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Nullable
public String removeSubscription(String subscriptionId) {
for (Map.Entry<String, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry : this.destinationLookup.entrySet()) {
for (Map.Entry<String, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry :
this.destinationLookup.entrySet()) {
Set<Subscription> subs = destinationEntry.getValue();
if (subs != null) {
for (Subscription sub : subs) {
@@ -491,7 +494,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
private final Expression selectorExpression;
public Subscription(String id, Expression selector) {
public Subscription(String id, @Nullable Expression selector) {
Assert.notNull(id, "Subscription id must not be null");
this.id = id;
this.selectorExpression = selector;
@@ -501,6 +504,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
return this.id;
}
@Nullable
public Expression getSelectorExpression() {
return this.selectorExpression;
}
@@ -530,15 +534,17 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
@Override
public boolean canRead(EvaluationContext context, Object target, String name) {
public boolean canRead(EvaluationContext context, @Nullable Object target, String name) {
return true;
}
@Override
public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException {
public TypedValue read(EvaluationContext context, @Nullable Object target, String name) throws AccessException {
Assert.state(target instanceof MessageHeaders, "No MessageHeaders");
MessageHeaders headers = (MessageHeaders) target;
SimpMessageHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
Assert.state(accessor != null, "No SimpMessageHeaderAccessor");
Object value;
if ("destination".equalsIgnoreCase(name)) {
value = accessor.getDestination();
@@ -553,12 +559,12 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
@Override
public boolean canWrite(EvaluationContext context, Object target, String name) {
public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) {
return false;
}
@Override
public void write(EvaluationContext context, Object target, String name, Object value) {
public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object value) {
}
}

View File

@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
@@ -162,6 +163,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
* Return the configured TaskScheduler.
* @since 4.2
*/
@Nullable
public TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
@@ -176,7 +178,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
* @since 4.2
*/
public void setHeartbeatValue(long[] heartbeat) {
if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
if (heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat));
}
this.heartbeatValue = heartbeat;
@@ -186,6 +188,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
* The configured value for the heart-beat settings.
* @since 4.2
*/
@Nullable
public long[] getHeartbeatValue() {
return this.heartbeatValue;
}
@@ -204,6 +207,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
* Return the configured header initializer.
* @since 4.1
*/
@Nullable
public MessageHeaderInitializer getHeaderInitializer() {
return this.headerInitializer;
}
@@ -265,21 +269,27 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
}
else if (SimpMessageType.CONNECT.equals(messageType)) {
logMessage(message);
long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
long[] serverHeartbeat = getHeartbeatValue();
this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
initHeaders(connectAck);
connectAck.setSessionId(sessionId);
connectAck.setUser(SimpMessageHeaderAccessor.getUser(headers));
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, serverHeartbeat);
Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
getClientOutboundChannel().send(messageOut);
if (sessionId != null) {
long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
long[] serverHeartbeat = getHeartbeatValue();
this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
initHeaders(connectAck);
connectAck.setSessionId(sessionId);
if (user != null) {
connectAck.setUser(user);
}
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, serverHeartbeat);
Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
getClientOutboundChannel().send(messageOut);
}
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
logMessage(message);
handleDisconnect(sessionId, user, message);
if (sessionId != null) {
handleDisconnect(sessionId, user, message);
}
}
else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
logMessage(message);
@@ -291,7 +301,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
}
}
private void updateSessionReadTime(String sessionId) {
private void updateSessionReadTime(@Nullable String sessionId) {
if (sessionId != null) {
SessionInfo info = this.sessions.get(sessionId);
if (info != null) {
@@ -314,12 +324,14 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
}
}
private void handleDisconnect(String sessionId, Principal user, Message<?> origMessage) {
private void handleDisconnect(String sessionId, @Nullable Principal user, @Nullable Message<?> origMessage) {
this.sessions.remove(sessionId);
this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
accessor.setSessionId(sessionId);
accessor.setUser(user);
if (user != null) {
accessor.setUser(user);
}
if (origMessage != null) {
accessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, origMessage);
}
@@ -328,7 +340,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
getClientOutboundChannel().send(message);
}
protected void sendMessageToSubscribers(String destination, Message<?> message) {
protected void sendMessageToSubscribers(@Nullable String destination, Message<?> message) {
MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
if (!subscriptions.isEmpty() && logger.isDebugEnabled()) {
logger.debug("Broadcasting to " + subscriptions.size() + " sessions.");
@@ -382,7 +394,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private volatile long lastWriteTime;
public SessionInfo(String sessiondId, Principal user, long[] clientHeartbeat, long[] serverHeartbeat) {
public SessionInfo(String sessiondId, @Nullable Principal user,
@Nullable long[] clientHeartbeat, @Nullable long[] serverHeartbeat) {
this.sessiondId = sessiondId;
this.user = user;
if (clientHeartbeat != null && serverHeartbeat != null) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
@@ -42,7 +43,7 @@ public abstract class AbstractBrokerRegistration {
public AbstractBrokerRegistration(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
MessageChannel clientOutboundChannel, @Nullable String[] destinationPrefixes) {
Assert.notNull(clientOutboundChannel, "'clientInboundChannel' must not be null");
Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' must not be null");

View File

@@ -235,6 +235,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
* Provide access to the configured PatchMatcher for access from other
* configuration classes.
*/
@Nullable
public final PathMatcher getPathMatcher() {
return getBrokerRegistry().getPathMatcher();
}
@@ -308,7 +309,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(),
brokerChannel(), userDestinationResolver());
String destination = getBrokerRegistry().getUserDestinationBroadcast();
handler.setBroadcastDestination(destination);
if (destination != null) {
handler.setBroadcastDestination(destination);
}
return handler;
}
@@ -384,7 +387,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
if (prefix != null) {
resolver.setUserDestinationPrefix(prefix);
}
resolver.setPathMatcher(getBrokerRegistry().getPathMatcher());
resolver.setPathMatcher(getPathMatcher());
return resolver;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2016 the original author or authors.7
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -62,9 +62,7 @@ public class ChannelRegistration {
* Configure interceptors for the message channel.
*/
public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) {
if (interceptors != null) {
this.interceptors.addAll(Arrays.asList(interceptors));
}
this.interceptors.addAll(Arrays.asList(interceptors));
return this;
}

View File

@@ -99,11 +99,13 @@ public class MessageBrokerRegistry {
return this.brokerChannelRegistration;
}
@Nullable
protected String getUserDestinationBroadcast() {
return (this.brokerRelayRegistration != null ?
this.brokerRelayRegistration.getUserDestinationBroadcast() : null);
}
@Nullable
protected String getUserRegistryBroadcast() {
return (this.brokerRelayRegistration != null ?
this.brokerRelayRegistration.getUserRegistryBroadcast() : null);
@@ -124,6 +126,7 @@ public class MessageBrokerRegistry {
return this;
}
@Nullable
protected Collection<String> getApplicationDestinationPrefixes() {
return (this.applicationDestinationPrefixes != null ?
Arrays.asList(this.applicationDestinationPrefixes) : null);
@@ -146,6 +149,7 @@ public class MessageBrokerRegistry {
return this;
}
@Nullable
protected String getUserDestinationPrefix() {
return this.userDestinationPrefix;
}
@@ -172,6 +176,7 @@ public class MessageBrokerRegistry {
return this;
}
@Nullable
protected PathMatcher getPathMatcher() {
return this.pathMatcher;
}

View File

@@ -160,7 +160,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
/**
* Configure the TaskScheduler to use for receipt tracking.
*/
public void setTaskScheduler(TaskScheduler taskScheduler) {
public void setTaskScheduler(@Nullable TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}
@@ -227,6 +227,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
return receiptable;
}
@Nullable
private String checkOrAddReceipt(StompHeaders stompHeaders) {
String receiptId = stompHeaders.getReceipt();
if (isAutoReceiptEnabled() && receiptId == null) {
@@ -266,9 +267,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
private void execute(Message<byte[]> message) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (logger.isTraceEnabled()) {
logger.trace("Sending " + accessor.getDetailedLogMessage(message.getPayload()));
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null) {
logger.trace("Sending " + accessor.getDetailedLogMessage(message.getPayload()));
}
}
TcpConnection<byte[]> conn = this.connection;
Assert.state(conn != null, "Connection closed");
@@ -333,7 +336,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
return receiptable;
}
private void unsubscribe(String id, StompHeaders stompHeaders) {
private void unsubscribe(String id, @Nullable StompHeaders stompHeaders) {
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
if (stompHeaders != null) {
accessor.addNativeHeaders(stompHeaders);
@@ -384,6 +387,8 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override
public void handleMessage(Message<byte[]> message) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.state(accessor != null, "No StompHeaderAccessor");
accessor.setSessionId(this.sessionId);
StompCommand command = accessor.getCommand();
Map<String, List<String>> nativeHeaders = accessor.getNativeHeaders();
@@ -392,6 +397,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
if (logger.isTraceEnabled()) {
logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
}
try {
if (StompCommand.MESSAGE.equals(command)) {
DefaultSubscription subscription = this.subscriptions.get(stompHeaders.getSubscription());
@@ -438,12 +444,16 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
handler.handleFrame(stompHeaders, null);
return;
}
Type type = handler.getPayloadType(stompHeaders);
Class<?> payloadType = ResolvableType.forType(type).resolve();
Object object = getMessageConverter().fromMessage(message, payloadType);
Type payloadType = handler.getPayloadType(stompHeaders);
Class<?> resolvedType = ResolvableType.forType(payloadType).resolve();
if (resolvedType == null) {
throw new MessageConversionException("Unresolvable payload type [" + payloadType +
"] from handler type [" + handler.getClass() + "]");
}
Object object = getMessageConverter().fromMessage(message, resolvedType);
if (object == null) {
throw new MessageConversionException("No suitable converter, payloadType=" + payloadType +
", handlerType=" + handler.getClass());
throw new MessageConversionException("No suitable converter for payload type [" + payloadType +
"] from handler type [" + handler.getClass() + "]");
}
handler.handleFrame(stompHeaders, object);
}
@@ -514,9 +524,9 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private Boolean result;
public ReceiptHandler(String receiptId) {
public ReceiptHandler(@Nullable String receiptId) {
this.receiptId = receiptId;
if (this.receiptId != null) {
if (receiptId != null) {
initReceiptHandling();
}
}
@@ -638,8 +648,10 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override
public void unsubscribe(@Nullable StompHeaders stompHeaders) {
String id = this.headers.getId();
DefaultStompSession.this.subscriptions.remove(id);
DefaultStompSession.this.unsubscribe(id, stompHeaders);
if (id != null) {
DefaultStompSession.this.subscriptions.remove(id);
DefaultStompSession.this.unsubscribe(id, stompHeaders);
}
}
@Override

View File

@@ -16,6 +16,7 @@
package org.springframework.messaging.simp.stomp;
import java.security.Principal;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -299,7 +300,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* servers forward messages to each other (e.g. unresolved user destinations).
* @param subscriptions the destinations to subscribe to.
*/
public void setSystemSubscriptions(Map<String, MessageHandler> subscriptions) {
public void setSystemSubscriptions(@Nullable Map<String, MessageHandler> subscriptions) {
this.systemSubscriptions.clear();
if (subscriptions != null) {
this.systemSubscriptions.putAll(subscriptions);
@@ -328,6 +329,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Return the configured virtual host value.
*/
@Nullable
public String getVirtualHost() {
return this.virtualHost;
}
@@ -345,6 +347,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* invoked and this method is invoked before the handler is started and
* hence a default implementation initialized).
*/
@Nullable
public TcpOperations<byte[]> getTcpClient() {
return this.tcpClient;
}
@@ -362,6 +365,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Return the configured header initializer.
*/
@Nullable
public MessageHeaderInitializer getHeaderInitializer() {
return this.headerInitializer;
}
@@ -399,7 +403,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
accessor.setLogin(this.systemLogin);
accessor.setPasscode(this.systemPasscode);
accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
accessor.setHost(getVirtualHost());
String virtualHost = getVirtualHost();
if (virtualHost != null) {
accessor.setHost(virtualHost);
}
accessor.setSessionId(SYSTEM_SESSION_ID);
if (logger.isDebugEnabled()) {
logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
@@ -443,7 +450,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
getHeaderInitializer().initHeaders(accessor);
}
accessor.setSessionId(sessionId);
accessor.setUser(SimpMessageHeaderAccessor.getUser(message.getHeaders()));
Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
if (user != null) {
accessor.setUser(user);
}
accessor.setMessage("Broker not available.");
MessageHeaders headers = accessor.getMessageHeaders();
getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
@@ -564,6 +574,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.sessionId;
}
@Nullable
protected TcpConnection<byte[]> getTcpConnection() {
return this.tcpConnection;
}
@@ -616,22 +627,27 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private void sendStompErrorFrameToClient(String errorText) {
if (this.isRemoteClientSession) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
getHeaderInitializer().initHeaders(accessor);
}
headerAccessor.setSessionId(this.sessionId);
headerAccessor.setUser(this.connectHeaders.getUser());
headerAccessor.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
accessor.setSessionId(this.sessionId);
Principal user = this.connectHeaders.getUser();
if (user != null) {
accessor.setUser(user);
}
accessor.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
handleInboundMessage(errorMessage);
}
}
protected void handleInboundMessage(Message<?> message) {
if (this.isRemoteClientSession) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
accessor.setImmutable();
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, null);
if (accessor != null) {
accessor.setImmutable();
}
StompBrokerRelayMessageHandler.this.getClientOutboundChannel().send(message);
}
}
@@ -639,8 +655,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void handleMessage(Message<byte[]> message) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.state(accessor != null, "No StompHeaderAccessor");
accessor.setSessionId(this.sessionId);
accessor.setUser(this.connectHeaders.getUser());
Principal user = this.connectHeaders.getUser();
if (user != null) {
accessor.setUser(user);
}
StompCommand command = accessor.getCommand();
if (StompCommand.CONNECTED.equals(command)) {
@@ -915,7 +935,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void handleInboundMessage(Message<?> message) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.MESSAGE.equals(accessor.getCommand())) {
if (accessor != null && StompCommand.MESSAGE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
if (destination == null) {
if (logger.isDebugEnabled()) {

View File

@@ -18,6 +18,7 @@ package org.springframework.messaging.simp.stomp;
import java.util.Arrays;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.scheduling.TaskScheduler;
@@ -82,6 +83,7 @@ public abstract class StompClientSupport {
/**
* The configured TaskScheduler.
*/
@Nullable
public TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
@@ -99,7 +101,7 @@ public abstract class StompClientSupport {
* http://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>
*/
public void setDefaultHeartbeat(long[] heartbeat) {
if (heartbeat == null || heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
if (heartbeat.length != 2 || heartbeat[0] < 0 || heartbeat[1] < 0) {
throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(heartbeat));
}
this.defaultHeartbeat = heartbeat;
@@ -118,7 +120,8 @@ public abstract class StompClientSupport {
* is set to "0,0", and {@code true} otherwise.
*/
public boolean isDefaultHeartbeatEnabled() {
return (getDefaultHeartbeat() != null && getDefaultHeartbeat()[0] != 0 && getDefaultHeartbeat()[1] != 0);
long[] heartbeat = getDefaultHeartbeat();
return (heartbeat[0] != 0 && heartbeat[1] != 0);
}
/**
@@ -144,7 +147,9 @@ public abstract class StompClientSupport {
* @param handler the handler for the STOMP session
* @return the created session
*/
protected ConnectionHandlingStompSession createSession(StompHeaders connectHeaders, StompSessionHandler handler) {
protected ConnectionHandlingStompSession createSession(
@Nullable StompHeaders connectHeaders, StompSessionHandler handler) {
connectHeaders = processConnectHeaders(connectHeaders);
DefaultStompSession session = new DefaultStompSession(handler, connectHeaders);
session.setMessageConverter(getMessageConverter());
@@ -159,7 +164,7 @@ public abstract class StompClientSupport {
* @param connectHeaders the headers to modify
* @return the modified headers
*/
protected StompHeaders processConnectHeaders(StompHeaders connectHeaders) {
protected StompHeaders processConnectHeaders(@Nullable StompHeaders connectHeaders) {
connectHeaders = (connectHeaders != null ? connectHeaders : new StompHeaders());
if (connectHeaders.getHeartbeat() == null) {
connectHeaders.setHeartbeat(getDefaultHeartbeat());

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -60,7 +60,7 @@ public class StompDecoder {
* Configure a {@link MessageHeaderInitializer} to apply to the headers of
* {@link Message}s from decoded STOMP frames.
*/
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer) {
this.headerInitializer = headerInitializer;
}
@@ -82,7 +82,6 @@ public class StompDecoder {
* @return the decoded messages, or an empty list if none
* @throws StompConversionException raised in case of decoding issues
*/
@Nullable
public List<Message<byte[]>> decode(ByteBuffer byteBuffer) {
return decode(byteBuffer, null);
}
@@ -123,7 +122,8 @@ public class StompDecoder {
/**
* Decode a single STOMP frame from the given {@code buffer} into a {@link Message}.
*/
private Message<byte[]> decodeMessage(ByteBuffer byteBuffer, MultiValueMap<String, String> headers) {
@Nullable
private Message<byte[]> decodeMessage(ByteBuffer byteBuffer, @Nullable MultiValueMap<String, String> headers) {
Message<byte[]> decodedMessage = null;
skipLeadingEol(byteBuffer);
@@ -144,9 +144,12 @@ public class StompDecoder {
payload = readPayload(byteBuffer, headerAccessor);
}
if (payload != null) {
if (payload.length > 0 && !headerAccessor.getCommand().isBodyAllowed()) {
throw new StompConversionException(headerAccessor.getCommand() +
" shouldn't have a payload: length=" + payload.length + ", headers=" + headers);
if (payload.length > 0) {
StompCommand stompCommand = headerAccessor.getCommand();
if (stompCommand != null && !stompCommand.isBodyAllowed()) {
throw new StompConversionException(headerAccessor.getCommand() +
" shouldn't have a payload: length=" + payload.length + ", headers=" + headers);
}
}
headerAccessor.updateSimpMessageHeadersFromStompHeaders();
headerAccessor.setLeaveMutable(true);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
@@ -55,8 +56,7 @@ public class StompEncoder {
private static final int HEADER_KEY_CACHE_LIMIT = 32;
private final Map<String, byte[]> headerKeyAccessCache =
new ConcurrentHashMap<>(HEADER_KEY_CACHE_LIMIT);
private final Map<String, byte[]> headerKeyAccessCache = new ConcurrentHashMap<>(HEADER_KEY_CACHE_LIMIT);
@SuppressWarnings("serial")
private final Map<String, byte[]> headerKeyUpdateCache =
@@ -222,7 +222,7 @@ public class StompEncoder {
return (sb != null ? sb.toString() : inString);
}
private StringBuilder getStringBuilder(StringBuilder sb, String inString, int i) {
private StringBuilder getStringBuilder(@Nullable StringBuilder sb, String inString, int i) {
if (sb == null) {
sb = new StringBuilder(inString.length());
sb.append(inString.substring(0, i));

View File

@@ -39,7 +39,7 @@ public interface StompFrameHandler {
* Handle a STOMP frame with the payload converted to the target type returned
* from {@link #getPayloadType(StompHeaders)}.
* @param headers the headers of the frame
* @param payload the payload or {@code null} if there was no payload
* @param payload the payload, or {@code null} if there was no payload
*/
void handleFrame(StompHeaders headers, @Nullable Object payload);

View File

@@ -180,10 +180,9 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
return wrap(message);
}
Map<String, List<String>> getNativeHeaders() {
@SuppressWarnings("unchecked")
Map<String, List<String>> map = (Map<String, List<String>>) getHeader(NATIVE_HEADERS);
return (map != null ? map : Collections.emptyMap());
// Redeclared for visibility within simp.stomp
protected Map<String, List<String>> getNativeHeaders() {
return super.getNativeHeaders();
}
public StompCommand updateStompCommandAsClientMessage() {
@@ -251,6 +250,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_HOST_HEADER, host);
}
@Nullable
public String getHost() {
return getFirstNativeHeader(STOMP_HOST_HEADER);
}
@@ -290,10 +290,8 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
@Nullable
public Integer getContentLength() {
if (containsNativeHeader(STOMP_CONTENT_LENGTH_HEADER)) {
return Integer.valueOf(getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER));
}
return null;
String header = getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER);
return (header != null ? Integer.valueOf(header) : null);
}
public void setContentLength(int contentLength) {
@@ -308,6 +306,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_ACK_HEADER, ack);
}
@Nullable
public String getAck() {
return getFirstNativeHeader(STOMP_ACK_HEADER);
}
@@ -316,6 +315,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_NACK_HEADER, nack);
}
@Nullable
public String getNack() {
return getFirstNativeHeader(STOMP_NACK_HEADER);
}
@@ -324,6 +324,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_LOGIN_HEADER, login);
}
@Nullable
public String getLogin() {
return getFirstNativeHeader(STOMP_LOGIN_HEADER);
}
@@ -354,6 +355,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_RECEIPT_ID_HEADER, receiptId);
}
@Nullable
public String getReceiptId() {
return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER);
}
@@ -362,10 +364,12 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_RECEIPT_HEADER, receiptId);
}
@Nullable
public String getReceipt() {
return getFirstNativeHeader(STOMP_RECEIPT_HEADER);
}
@Nullable
public String getMessage() {
return getFirstNativeHeader(STOMP_MESSAGE_HEADER);
}
@@ -374,6 +378,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_MESSAGE_HEADER, content);
}
@Nullable
public String getMessageId() {
return getFirstNativeHeader(STOMP_MESSAGE_ID_HEADER);
}
@@ -382,6 +387,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_MESSAGE_ID_HEADER, id);
}
@Nullable
public String getVersion() {
return getFirstNativeHeader(STOMP_VERSION_HEADER);
}
@@ -428,11 +434,16 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
return super.getDetailedLogMessage(payload);
}
StringBuilder sb = new StringBuilder();
sb.append(command.name()).append(" ").append(getNativeHeaders()).append(appendSession());
sb.append(command.name()).append(" ");
Map<String, List<String>> nativeHeaders = getNativeHeaders();
if (nativeHeaders != null) {
sb.append(nativeHeaders);
}
sb.append(appendSession());
if (getUser() != null) {
sb.append(", user=").append(getUser().getName());
}
if (command.isBodyAllowed()) {
if (payload != null && command.isBodyAllowed()) {
sb.append(appendPayload(payload));
}
return sb.toString();

View File

@@ -138,6 +138,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Return the content-type header value.
*/
@Nullable
public MimeType getContentType() {
String value = getFirst(CONTENT_TYPE);
return (StringUtils.hasLength(value) ? MimeTypeUtils.parseMimeType(value) : null);
@@ -170,6 +171,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the receipt header.
*/
@Nullable
public String getReceipt() {
return getFirst(RECEIPT);
}
@@ -185,6 +187,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the host header.
*/
@Nullable
public String getHost() {
return getFirst(HOST);
}
@@ -200,6 +203,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the login header.
*/
@Nullable
public String getLogin() {
return getFirst(LOGIN);
}
@@ -215,6 +219,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the passcode header.
*/
@Nullable
public String getPasscode() {
return getFirst(PASSCODE);
}
@@ -223,7 +228,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
* Set the heartbeat header.
* Applies to the CONNECT and CONNECTED frames.
*/
public void setHeartbeat(long[] heartbeat) {
public void setHeartbeat(@Nullable long[] heartbeat) {
if (heartbeat == null || heartbeat.length != 2) {
throw new IllegalArgumentException("Heart-beat array must be of length 2, not " +
(heartbeat != null ? heartbeat.length : "null"));
@@ -238,6 +243,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the heartbeat header.
*/
@Nullable
public long[] getHeartbeat() {
String rawValue = getFirst(HEARTBEAT);
String[] rawValues = StringUtils.split(rawValue, ",");
@@ -267,6 +273,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the session header.
*/
@Nullable
public String getSession() {
return getFirst(SESSION);
}
@@ -283,6 +290,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
* Get the server header.
* Applies to the CONNECTED frame.
*/
@Nullable
public String getServer() {
return getFirst(SERVER);
}
@@ -298,6 +306,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
* Get the destination header.
* Applies to the SEND, SUBSCRIBE, and MESSAGE frames.
*/
@Nullable
public String getDestination() {
return getFirst(DESTINATION);
}
@@ -313,6 +322,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the id header.
*/
@Nullable
public String getId() {
return getFirst(ID);
}
@@ -328,6 +338,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the ack header.
*/
@Nullable
public String getAck() {
return getFirst(ACK);
}
@@ -343,6 +354,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the subscription header.
*/
@Nullable
public String getSubscription() {
return getFirst(SUBSCRIPTION);
}
@@ -358,6 +370,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the message-id header.
*/
@Nullable
public String getMessageId() {
return getFirst(MESSAGE_ID);
}
@@ -373,6 +386,7 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Get the receipt header.
*/
@Nullable
public String getReceiptId() {
return getFirst(RECEIPT_ID);
}
@@ -523,8 +537,8 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
/**
* Return a {@code StompHeaders} object that can only be read, not written to.
*/
public static StompHeaders readOnlyStompHeaders(Map<String, List<String>> headers) {
return new StompHeaders(headers, true);
public static StompHeaders readOnlyStompHeaders(@Nullable Map<String, List<String>> headers) {
return new StompHeaders((headers != null ? headers : Collections.emptyMap()), true);
}
}

View File

@@ -134,6 +134,7 @@ public interface StompSession {
void addReceiptLostTask(Runnable runnable);
}
/**
* A handle to use to unsubscribe or to track a receipt.
*/
@@ -142,6 +143,7 @@ public interface StompSession {
/**
* Return the id for the subscription.
*/
@Nullable
String getSubscriptionId();
/**

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
package org.springframework.messaging.simp.stomp;
import org.springframework.lang.Nullable;
/**
* A contract for client STOMP session lifecycle events including a callback
* when the session is established and notifications of transport or message
@@ -52,8 +54,8 @@ public interface StompSessionHandler extends StompFrameHandler {
* @param payload the raw payload
* @param exception the exception
*/
void handleException(StompSession session, StompCommand command, StompHeaders headers,
byte[] payload, Throwable exception);
void handleException(StompSession session, @Nullable StompCommand command,
StompHeaders headers, byte[] payload, Throwable exception);
/**
* Handle a low level transport error which could be an I/O error or a

View File

@@ -30,13 +30,6 @@ import org.springframework.lang.Nullable;
*/
public abstract class StompSessionHandlerAdapter implements StompSessionHandler {
/**
* This implementation is empty.
*/
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
}
/**
* This implementation returns String as the expected payload type
* for STOMP ERROR frames.
@@ -57,8 +50,15 @@ public abstract class StompSessionHandlerAdapter implements StompSessionHandler
* This implementation is empty.
*/
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers,
byte[] payload, Throwable exception) {
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
}
/**
* This implementation is empty.
*/
@Override
public void handleException(StompSession session, @Nullable StompCommand command,
StompHeaders headers, byte[] payload, Throwable exception) {
}
/**

View File

@@ -112,7 +112,7 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
* @param pathMatcher the PathMatcher used to work with destinations
* @since 4.3
*/
public void setPathMatcher(PathMatcher pathMatcher) {
public void setPathMatcher(@Nullable PathMatcher pathMatcher) {
if (pathMatcher != null) {
this.keepLeadingSlash = pathMatcher.combine("1", "2").equals("1/2");
}
@@ -148,15 +148,16 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
return null;
}
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
switch (messageType) {
case SUBSCRIBE:
case UNSUBSCRIBE:
return parseSubscriptionMessage(message, sourceDestination);
case MESSAGE:
return parseMessage(headers, sourceDestination);
default:
return null;
if (messageType != null) {
switch (messageType) {
case SUBSCRIBE:
case UNSUBSCRIBE:
return parseSubscriptionMessage(message, sourceDestination);
case MESSAGE:
return parseMessage(headers, sourceDestination);
}
}
return null;
}
private ParseResult parseSubscriptionMessage(Message<?> message, String sourceDestination) {
@@ -174,18 +175,18 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
Principal principal = SimpMessageHeaderAccessor.getUser(headers);
String user = (principal != null ? principal.getName() : null);
Set<String> sessionIds = Collections.singleton(sessionId);
return new ParseResult(sourceDestination, actualDestination, sourceDestination,
sessionIds, user);
return new ParseResult(sourceDestination, actualDestination, sourceDestination, sessionIds, user);
}
private ParseResult parseMessage(MessageHeaders headers, String sourceDestination) {
private ParseResult parseMessage(MessageHeaders headers, String sourceDest) {
int prefixEnd = this.prefix.length();
int userEnd = sourceDestination.indexOf('/', prefixEnd);
int userEnd = sourceDest.indexOf('/', prefixEnd);
Assert.isTrue(userEnd > 0, "Expected destination pattern \"/user/{userId}/**\"");
String actualDestination = sourceDestination.substring(userEnd);
String subscribeDestination = this.prefix.substring(0, prefixEnd - 1) + actualDestination;
String userName = sourceDestination.substring(prefixEnd, userEnd);
String actualDest = sourceDest.substring(userEnd);
String subscribeDest = this.prefix.substring(0, prefixEnd - 1) + actualDest;
String userName = sourceDest.substring(prefixEnd, userEnd);
userName = StringUtils.replace(userName, "%2F", "/");
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
Set<String> sessionIds;
if (userName.equals(sessionId)) {
@@ -195,14 +196,14 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
else {
sessionIds = getSessionIdsByUser(userName, sessionId);
}
if (!this.keepLeadingSlash) {
actualDestination = actualDestination.substring(1);
actualDest = actualDest.substring(1);
}
return new ParseResult(sourceDestination, actualDestination, subscribeDestination,
sessionIds, userName);
return new ParseResult(sourceDest, actualDest, subscribeDest, sessionIds, userName);
}
private Set<String> getSessionIdsByUser(String userName, String sessionId) {
private Set<String> getSessionIdsByUser(String userName, @Nullable String sessionId) {
Set<String> sessionIds;
SimpUser user = this.userRegistry.getUser(userName);
if (user != null) {
@@ -265,9 +266,8 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
private final String user;
public ParseResult(String sourceDest, String actualDest, String subscribeDest,
Set<String> sessionIds, String user) {
Set<String> sessionIds, @Nullable String user) {
this.sourceDestination = sourceDest;
this.actualDestination = actualDest;
@@ -276,7 +276,6 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
this.user = user;
}
public String getSourceDestination() {
return this.sourceDestination;
}
@@ -293,6 +292,7 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
return this.sessionIds;
}
@Nullable
public String getUser() {
return this.user;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,15 +41,15 @@ public interface SimpUser {
/**
* Look up the session for the given id.
* @param sessionId the session id
* @return the matching session or {@code null}.
* @return the matching session, or {@code null} if none found
*/
@Nullable
SimpSession getSession(String sessionId);
SimpSession getSession(@Nullable String sessionId);
/**
* Return the sessions for the user.
* The returned set is a copy and will never be modified.
* @return a set of session ids, or an empty set.
* @return a set of session ids, or an empty set if none
*/
Set<SimpSession> getSessions();

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -114,6 +114,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
/**
* Return the configured destination for unresolved messages.
*/
@Nullable
public String getBroadcastDestination() {
return (this.broadcastHandler != null ? this.broadcastHandler.getBroadcastDestination() : null);
}
@@ -138,6 +139,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
/**
* Return the configured header initializer.
*/
@Nullable
public MessageHeaderInitializer getHeaderInitializer() {
return this.headerInitializer;
}
@@ -189,35 +191,40 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Message<?> messageToUse = message;
if (this.broadcastHandler != null) {
message = this.broadcastHandler.preHandle(message);
if (message == null) {
messageToUse = this.broadcastHandler.preHandle(message);
if (messageToUse == null) {
return;
}
}
UserDestinationResult result = this.destinationResolver.resolveDestination(message);
UserDestinationResult result = this.destinationResolver.resolveDestination(messageToUse);
if (result == null) {
return;
}
if (result.getTargetDestinations().isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("No active sessions for user destination: " + result.getSourceDestination());
}
if (this.broadcastHandler != null) {
this.broadcastHandler.handleUnresolved(message);
this.broadcastHandler.handleUnresolved(messageToUse);
}
return;
}
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(messageToUse);
initHeaders(accessor);
accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
accessor.setLeaveMutable(true);
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
messageToUse = MessageBuilder.createMessage(messageToUse.getPayload(), accessor.getMessageHeaders());
if (logger.isTraceEnabled()) {
logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations());
}
for (String target : result.getTargetDestinations()) {
this.messagingTemplate.send(target, message);
this.messagingTemplate.send(target, messageToUse);
}
}
@@ -262,6 +269,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
}
SimpMessageHeaderAccessor accessor =
SimpMessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
Assert.state(accessor != null, "No SimpMessageHeaderAccessor");
if (accessor.getSessionId() == null) {
// Our own broadcast
return null;
@@ -277,7 +285,9 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
}
newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name));
}
newAccessor.setDestination(destination);
if (destination != null) {
newAccessor.setDestination(destination);
}
newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block
return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders());
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,8 +40,8 @@ public class UserDestinationResult {
private final String user;
public UserDestinationResult(String sourceDestination,
Set<String> targetDestinations, String subscribeDestination, String user) {
public UserDestinationResult(String sourceDestination, Set<String> targetDestinations,
String subscribeDestination, @Nullable String user) {
Assert.notNull(sourceDestination, "'sourceDestination' must not be null");
Assert.notNull(targetDestinations, "'targetDestinations' must not be null");
@@ -95,9 +95,10 @@ public class UserDestinationResult {
return this.user;
}
@Override
public String toString() {
return "UserDestinationResult[source=" + this.sourceDestination + ", target=" + this.targetDestinations +
return "UserDestinationResult [source=" + this.sourceDestination + ", target=" + this.targetDestinations +
", subscribeDestination=" + this.subscribeDestination + ", user=" + this.user + "]";
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@ public abstract class AbstractHeaderMapper<T> implements HeaderMapper<T> {
* user-defined property that is being mapped into the MessageHeaders.
* The default is an empty String (no prefix).
*/
public void setInboundPrefix(String inboundPrefix) {
public void setInboundPrefix(@Nullable String inboundPrefix) {
this.inboundPrefix = (inboundPrefix != null ? inboundPrefix : "");
}
@@ -54,7 +54,7 @@ public abstract class AbstractHeaderMapper<T> implements HeaderMapper<T> {
* user-defined message header that is being mapped into the protocol-specific
* Message. The default is an empty String (no prefix).
*/
public void setOutboundPrefix(String outboundPrefix) {
public void setOutboundPrefix(@Nullable String outboundPrefix) {
this.outboundPrefix = (outboundPrefix != null ? outboundPrefix : "");
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -108,29 +108,30 @@ public abstract class AbstractMessageChannel implements MessageChannel, Intercep
@Override
public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
Message<?> messageToUse = message;
ChannelInterceptorChain chain = new ChannelInterceptorChain();
boolean sent = false;
try {
message = chain.applyPreSend(message, this);
if (message == null) {
messageToUse = chain.applyPreSend(messageToUse, this);
if (messageToUse == null) {
return false;
}
sent = sendInternal(message, timeout);
chain.applyPostSend(message, this, sent);
chain.triggerAfterSendCompletion(message, this, sent, null);
sent = sendInternal(messageToUse, timeout);
chain.applyPostSend(messageToUse, this, sent);
chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
return sent;
}
catch (Exception ex) {
chain.triggerAfterSendCompletion(message, this, sent, ex);
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
throw new MessageDeliveryException(message,"Failed to send message to " + this, ex);
throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
}
catch (Throwable err) {
MessageDeliveryException ex2 =
new MessageDeliveryException(message, "Failed to send message to " + this, err);
chain.triggerAfterSendCompletion(message, this, sent, ex2);
new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
throw ex2;
}
}
@@ -203,16 +204,19 @@ public abstract class AbstractMessageChannel implements MessageChannel, Intercep
@Nullable
public Message<?> applyPostReceive(Message<?> message, MessageChannel channel) {
Message<?> messageToUse = message;
for (ChannelInterceptor interceptor : interceptors) {
message = interceptor.postReceive(message, channel);
if (message == null) {
messageToUse = interceptor.postReceive(messageToUse, channel);
if (messageToUse == null) {
return null;
}
}
return message;
return messageToUse;
}
public void triggerAfterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
public void triggerAfterReceiveCompletion(
@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
for (int i = this.receiveInterceptorIndex; i >= 0; i--) {
ChannelInterceptor interceptor = interceptors.get(i);
try {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -53,7 +53,7 @@ public interface ChannelInterceptor {
* completed and returned a Message, i.e. it did not return {@code null}.
* @since 4.1
*/
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex);
/**
* Invoked as soon as receive is called and before a Message is
@@ -65,8 +65,10 @@ public interface ChannelInterceptor {
/**
* Invoked immediately after a Message has been retrieved but before
* it is returned to the caller. The Message may be modified if
* necessary. This only applies to PollableChannels.
* necessary; {@code null} aborts further interceptor invocations.
* This only applies to PollableChannels.
*/
@Nullable
Message<?> postReceive(Message<?> message, MessageChannel channel);
/**
@@ -76,6 +78,6 @@ public interface ChannelInterceptor {
* completed and returned {@code true}.
* @since 4.1
*/
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -57,6 +57,6 @@ public interface ExecutorChannelInterceptor extends ChannelInterceptor {
* @param handler the target handler that handled the message
* @param ex any exception that may been raised by the handler
*/
void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex);
void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, @Nullable Exception ex);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -154,9 +154,10 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
@Nullable
private Message<?> applyBeforeHandle(Message<?> message) {
Message<?> messageToUse = message;
for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
message = interceptor.beforeHandle(message, ExecutorSubscribableChannel.this, this.messageHandler);
if (message == null) {
messageToUse = interceptor.beforeHandle(messageToUse, ExecutorSubscribableChannel.this, this.messageHandler);
if (messageToUse == null) {
String name = interceptor.getClass().getSimpleName();
if (logger.isDebugEnabled()) {
logger.debug(name + " returned null from beforeHandle, i.e. precluding the send.");
@@ -166,7 +167,7 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
}
this.interceptorIndex++;
}
return message;
return messageToUse;
}
private void triggerAfterMessageHandled(Message<?> message, @Nullable Exception ex) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -83,7 +83,10 @@ public class IdTimestampMessageHeaderInitializer implements MessageHeaderInitial
@Override
public void initHeaders(MessageHeaderAccessor headerAccessor) {
headerAccessor.setIdGenerator(getIdGenerator());
IdGenerator idGenerator = getIdGenerator();
if (idGenerator != null) {
headerAccessor.setIdGenerator(idGenerator);
}
headerAccessor.setEnableTimestamp(isEnableTimestamp());
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -110,7 +110,7 @@ public final class MessageBuilder<T> {
* existing values. Use { {@link #copyHeadersIfAbsent(Map)} to avoid overwriting
* values. Note that the 'id' and 'timestamp' header values will never be overwritten.
*/
public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy) {
public MessageBuilder<T> copyHeaders(@Nullable Map<String, ?> headersToCopy) {
this.headerAccessor.copyHeaders(headersToCopy);
return this;
}
@@ -119,7 +119,7 @@ public final class MessageBuilder<T> {
* Copy the name-value pairs from the provided Map. This operation will <em>not</em>
* overwrite any existing values.
*/
public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy) {
public MessageBuilder<T> copyHeadersIfAbsent(@Nullable Map<String, ?> headersToCopy) {
this.headerAccessor.copyHeadersIfAbsent(headersToCopy);
return this;
}

View File

@@ -315,7 +315,7 @@ public class MessageHeaderAccessor {
}
}
protected void verifyType(String headerName, Object headerValue) {
protected void verifyType(@Nullable String headerName, @Nullable Object headerValue) {
if (headerName != null && headerValue != null) {
if (MessageHeaders.ERROR_CHANNEL.equals(headerName) ||
MessageHeaders.REPLY_CHANNEL.endsWith(headerName)) {
@@ -368,7 +368,7 @@ public class MessageHeaderAccessor {
}
}
private List<String> getMatchingHeaderNames(String pattern, Map<String, Object> headers) {
private List<String> getMatchingHeaderNames(String pattern, @Nullable Map<String, Object> headers) {
List<String> matchingHeaderNames = new ArrayList<>();
if (headers != null) {
for (String key : headers.keySet()) {
@@ -385,7 +385,7 @@ public class MessageHeaderAccessor {
* <p>This operation will overwrite any existing values. Use
* {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values.
*/
public void copyHeaders(Map<String, ?> headersToCopy) {
public void copyHeaders(@Nullable Map<String, ?> headersToCopy) {
if (headersToCopy != null) {
for (Map.Entry<String, ?> entry : headersToCopy.entrySet()) {
if (!isReadOnly(entry.getKey())) {
@@ -399,7 +399,7 @@ public class MessageHeaderAccessor {
* Copy the name-value pairs from the provided Map.
* <p>This operation will <em>not</em> overwrite any existing values.
*/
public void copyHeadersIfAbsent(Map<String, ?> headersToCopy) {
public void copyHeadersIfAbsent(@Nullable Map<String, ?> headersToCopy) {
if (headersToCopy != null) {
for (Map.Entry<String, ?> entry : headersToCopy.entrySet()) {
if (!isReadOnly(entry.getKey())) {
@@ -447,6 +447,12 @@ public class MessageHeaderAccessor {
return (value instanceof MimeType ? (MimeType) value : MimeType.valueOf(value.toString()));
}
private Charset getCharset() {
MimeType contentType = getContentType();
Charset charset = (contentType != null ? contentType.getCharset() : null);
return (charset != null ? charset : DEFAULT_CHARSET);
}
public void setReplyChannelName(String replyChannelName) {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
}
@@ -455,6 +461,7 @@ public class MessageHeaderAccessor {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
}
@Nullable
public Object getReplyChannel() {
return getHeader(MessageHeaders.REPLY_CHANNEL);
}
@@ -467,7 +474,8 @@ public class MessageHeaderAccessor {
setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
}
public Object getErrorChannel() {
@Nullable
public Object getErrorChannel() {
return getHeader(MessageHeaders.ERROR_CHANNEL);
}
@@ -502,11 +510,9 @@ public class MessageHeaderAccessor {
else if (payload instanceof byte[]) {
byte[] bytes = (byte[]) payload;
if (isReadableContentType()) {
Charset charset = getContentType().getCharset();
charset = (charset != null ? charset : DEFAULT_CHARSET);
return (bytes.length < 80) ?
" payload=" + new String(bytes, charset) :
" payload=" + new String(Arrays.copyOf(bytes, 80), charset) + "...(truncated)";
" payload=" + new String(bytes, getCharset()) :
" payload=" + new String(Arrays.copyOf(bytes, 80), getCharset()) + "...(truncated)";
}
else {
return " payload=byte[" + bytes.length + "]";
@@ -520,16 +526,14 @@ public class MessageHeaderAccessor {
}
}
protected String getDetailedPayloadLogMessage(Object payload) {
protected String getDetailedPayloadLogMessage(@Nullable Object payload) {
if (payload instanceof String) {
return " payload=" + payload;
}
else if (payload instanceof byte[]) {
byte[] bytes = (byte[]) payload;
if (isReadableContentType()) {
Charset charset = getContentType().getCharset();
charset = (charset != null ? charset : DEFAULT_CHARSET);
return " payload=" + new String(bytes, charset);
return " payload=" + new String(bytes, getCharset());
}
else {
return " payload=byte[" + bytes.length + "]";
@@ -563,11 +567,13 @@ public class MessageHeaderAccessor {
* its type does not match the required type.
* <p>This is for cases where the existence of an accessor is strongly expected
* (followed up with an assertion) or where an accessor will be created otherwise.
* @param message the message to get an accessor for
* @param requiredType the required accessor type (or {@code null} for any)
* @return an accessor instance of the specified type, or {@code null} if none
* @since 4.1
*/
@Nullable
public static <T extends MessageHeaderAccessor> T getAccessor(Message<?> message, Class<T> requiredType) {
public static <T extends MessageHeaderAccessor> T getAccessor(Message<?> message, @Nullable Class<T> requiredType) {
return getAccessor(message.getHeaders(), requiredType);
}
@@ -575,18 +581,20 @@ public class MessageHeaderAccessor {
* A variation of {@link #getAccessor(org.springframework.messaging.Message, Class)}
* with a {@code MessageHeaders} instance instead of a {@code Message}.
* <p>This is for cases when a full message may not have been created yet.
* @param messageHeaders the message headers to get an accessor for
* @param requiredType the required accessor type (or {@code null} for any)
* @return an accessor instance of the specified type, or {@code null} if none
* @since 4.1
*/
@SuppressWarnings("unchecked")
@Nullable
public static <T extends MessageHeaderAccessor> T getAccessor(
MessageHeaders messageHeaders, Class<T> requiredType) {
MessageHeaders messageHeaders, @Nullable Class<T> requiredType) {
if (messageHeaders instanceof MutableMessageHeaders) {
MutableMessageHeaders mutableHeaders = (MutableMessageHeaders) messageHeaders;
MessageHeaderAccessor headerAccessor = mutableHeaders.getAccessor();
if (requiredType.isAssignableFrom(headerAccessor.getClass())) {
if (requiredType == null || requiredType.isInstance(headerAccessor)) {
return (T) headerAccessor;
}
}
@@ -606,9 +614,7 @@ public class MessageHeaderAccessor {
if (message.getHeaders() instanceof MutableMessageHeaders) {
MutableMessageHeaders mutableHeaders = (MutableMessageHeaders) message.getHeaders();
MessageHeaderAccessor accessor = mutableHeaders.getAccessor();
if (accessor != null) {
return (accessor.isMutable() ? accessor : accessor.createAccessor(message));
}
return (accessor.isMutable() ? accessor : accessor.createAccessor(message));
}
return new MessageHeaderAccessor(message);
}
@@ -619,7 +625,7 @@ public class MessageHeaderAccessor {
private boolean mutable = true;
public MutableMessageHeaders(Map<String, Object> headers) {
public MutableMessageHeaders(@Nullable Map<String, Object> headers) {
super(headers, MessageHeaders.ID_VALUE_NONE, -1L);
}
@@ -638,7 +644,7 @@ public class MessageHeaderAccessor {
IdGenerator idGenerator = (MessageHeaderAccessor.this.idGenerator != null ?
MessageHeaderAccessor.this.idGenerator : MessageHeaders.getIdGenerator());
UUID id = idGenerator.generateId();
if (id != null && id != MessageHeaders.ID_VALUE_NONE) {
if (id != MessageHeaders.ID_VALUE_NONE) {
getRawHeaders().put(ID, id);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -72,7 +72,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
/**
* A protected constructor accepting the headers of an existing message to copy.
*/
protected NativeMessageHeaderAccessor(Message<?> message) {
protected NativeMessageHeaderAccessor(@Nullable Message<?> message) {
super(message);
if (message != null) {
@SuppressWarnings("unchecked")
@@ -86,7 +86,8 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
}
@SuppressWarnings("unchecked")
private Map<String, List<String>> getNativeHeaders() {
@Nullable
protected Map<String, List<String>> getNativeHeaders() {
return (Map<String, List<String>>) getHeader(NATIVE_HEADERS);
}
@@ -105,7 +106,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
if (map != null) {
// Force removal since setHeader checks for equality
removeHeader(NATIVE_HEADERS);
setHeader(NATIVE_HEADERS, Collections.<String, List<String>>unmodifiableMap(map));
setHeader(NATIVE_HEADERS, Collections.unmodifiableMap(map));
}
super.setImmutable();
}
@@ -120,7 +121,8 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
}
/**
* @return all values for the specified native header or {@code null}.
* Return all values for the specified native header.
* or {@code null} if none.
*/
@Nullable
public List<String> getNativeHeader(String headerName) {
@@ -129,7 +131,8 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
}
/**
* @return the first value for the specified native header or {@code null}.
* Return the first value for the specified native header,
* or {@code null} if none.
*/
@Nullable
public String getFirstNativeHeader(String headerName) {
@@ -146,7 +149,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
/**
* Set the specified native header value replacing existing values.
*/
public void setNativeHeader(String name, String value) {
public void setNativeHeader(String name, @Nullable String value) {
Assert.state(isMutable(), "Already immutable");
Map<String, List<String>> map = getNativeHeaders();
if (value == null) {
@@ -171,7 +174,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
/**
* Add the specified native header value to existing values.
*/
public void addNativeHeader(String name, String value) {
public void addNativeHeader(String name, @Nullable String value) {
Assert.state(isMutable(), "Already immutable");
if (value == null) {
return;
@@ -190,7 +193,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
setModified(true);
}
public void addNativeHeaders(MultiValueMap<String, String> headers) {
public void addNativeHeaders(@Nullable MultiValueMap<String, String> headers) {
if (headers == null) {
return;
}

View File

@@ -30,7 +30,7 @@ public interface ReconnectStrategy {
/**
* Return the time to the next attempt to reconnect.
* @param attemptCount how many reconnect attempts have been made already
* @return the amount of time in milliseconds or {@code null} to stop
* @return the amount of time in milliseconds, or {@code null} to stop
*/
@Nullable
Long getTimeToNextAttempt(int attemptCount);

View File

@@ -22,6 +22,7 @@ import java.util.List;
import io.netty.buffer.ByteBuf;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
/**

View File

@@ -20,6 +20,7 @@ import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -176,8 +177,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
return flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> Mono.delay(
Duration.ofMillis(reconnectStrategy.getTimeToNextAttempt(attempt))));
.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
.map(time -> Mono.delay(Duration.ofMillis(time))).orElse(Mono.empty()));
}
@Override