Polishing

This commit is contained in:
Juergen Hoeller
2015-07-21 22:58:34 +02:00
parent 1a636b1023
commit edd6e76b9f
43 changed files with 244 additions and 262 deletions

View File

@@ -18,7 +18,6 @@ package org.springframework.messaging.converter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
@@ -32,9 +31,7 @@ import org.springframework.beans.TypeMismatchException;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.MarshallingFailureException;
import org.springframework.oxm.Unmarshaller;
import org.springframework.oxm.UnmarshallingFailureException;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
@@ -61,8 +58,7 @@ public class MarshallingMessageConverter extends AbstractMessageConverter {
* {@link #setUnmarshaller(Unmarshaller)} to be invoked separately.
*/
public MarshallingMessageConverter() {
this(new MimeType("application", "xml"), new MimeType("text", "xml"),
new MimeType("application", "*+xml"));
this(new MimeType("application", "xml"), new MimeType("text", "xml"), new MimeType("application", "*+xml"));
}
/**
@@ -76,10 +72,8 @@ public class MarshallingMessageConverter extends AbstractMessageConverter {
/**
* Constructor with {@link Marshaller}. If the given {@link Marshaller} also
* implements {@link Unmarshaller}, it is also used for unmarshalling.
*
* <p>Note that all {@code Marshaller} implementations in Spring also implement
* {@code Unmarshaller} so that you can safely use this constructor.
*
* @param marshaller object used as marshaller and unmarshaller
*/
public MarshallingMessageConverter(Marshaller marshaller) {
@@ -144,17 +138,13 @@ public class MarshallingMessageConverter extends AbstractMessageConverter {
Assert.notNull(this.unmarshaller, "Property 'unmarshaller' is required");
try {
Source source = getSource(message.getPayload());
Object result = this.unmarshaller.unmarshal(source);
if (!targetClass.isInstance(result)) {
throw new TypeMismatchException(result, targetClass);
}
return result;
}
catch (UnmarshallingFailureException ex) {
throw new MessageConversionException(message, "Could not unmarshal XML: " + ex.getMessage(), ex);
}
catch (IOException ex) {
catch (Exception ex) {
throw new MessageConversionException(message, "Could not unmarshal XML: " + ex.getMessage(), ex);
}
}
@@ -175,26 +165,20 @@ public class MarshallingMessageConverter extends AbstractMessageConverter {
if (byte[].class == getSerializedPayloadClass()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Result result = new StreamResult(out);
this.marshaller.marshal(payload, result);
payload = out.toByteArray();
}
else {
Writer writer = new StringWriter();
Result result = new StreamResult(writer);
this.marshaller.marshal(payload, result);
payload = writer.toString();
}
}
catch (MarshallingFailureException ex) {
throw new MessageConversionException("Could not marshal XML: " + ex.getMessage(), ex);
}
catch (IOException ex) {
catch (Exception ex) {
throw new MessageConversionException("Could not marshal XML: " + ex.getMessage(), ex);
}
return payload;
}
}

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.handler;
import org.springframework.core.Ordered;

View File

@@ -21,7 +21,7 @@ import org.springframework.messaging.Message;
/**
* Convenient base class for {@link AsyncHandlerMethodReturnValueHandler}
* implementations that support only asynchronous (Future-like) return values a
* implementations that support only asynchronous (Future-like) return values
* and merely serve as adapters of such types to Spring's
* {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture}.
*
@@ -38,7 +38,7 @@ public abstract class AbstractAsyncReturnValueHandler implements AsyncHandlerMet
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) {
// Should never be called since we return "true" from isAsyncReturnValue
throw new IllegalStateException("Unexpected invocation.");
throw new IllegalStateException("Unexpected invocation");
}
}

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.handler.invocation;
import org.springframework.core.MethodParameter;
@@ -38,11 +39,9 @@ public interface AsyncHandlerMethodReturnValueHandler extends HandlerMethodRetur
* with success and error callbacks. If this method returns {@code true},
* then {@link #toListenableFuture} is invoked next. If it returns
* {@code false}, then {@link #handleReturnValue} is called.
*
* <p><strong>Note:</strong> this method will only be invoked after
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
* is called and it returns {@code true}.
*
* @param returnValue the value returned from the handler method
* @param returnType the type of the return value.
* @return true if the return value type represents an async value.
@@ -55,11 +54,9 @@ public interface AsyncHandlerMethodReturnValueHandler extends HandlerMethodRetur
* {@link org.springframework.util.concurrent.SettableListenableFuture
* SettableListenableFuture}. Return value handling will then continue when
* the ListenableFuture is completed with either success or error.
*
* <p><strong>Note:</strong> this method will only be invoked after
* {@link #supportsReturnType(org.springframework.core.MethodParameter)}
* is called and it returns {@code true}.
*
* @param returnValue the value returned from the handler method
* @param returnType the type of the return value.
* @return the resulting ListenableFuture or {@code null} in which case no
@@ -67,4 +64,4 @@ public interface AsyncHandlerMethodReturnValueHandler extends HandlerMethodRetur
*/
ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType);
}
}

View File

@@ -43,4 +43,4 @@ public class CompletableFutureReturnValueHandler extends AbstractAsyncReturnValu
return new CompletableToListenableFutureAdapter<Object>((CompletableFuture<Object>) returnValue);
}
}
}

View File

@@ -121,7 +121,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
* @since 4.2
*/
public void setTaskScheduler(TaskScheduler taskScheduler) {
Assert.notNull(taskScheduler);
Assert.notNull(taskScheduler, "TaskScheduler must not be null");
this.taskScheduler = taskScheduler;
if (this.heartbeatValue == null) {
this.heartbeatValue = new long[] {10000, 10000};
@@ -185,7 +185,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
else {
Assert.isTrue(getHeartbeatValue() == null ||
(getHeartbeatValue()[0] == 0 && getHeartbeatValue()[1] == 0),
"Heartbeat values configured but no TaskScheduler is provided.");
"Heartbeat values configured but no TaskScheduler provided");
}
}
@@ -328,7 +328,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
@Override
public String toString() {
return "SimpleBroker[" + this.subscriptionRegistry + "]";
return "SimpleBrokerMessageHandler [" + this.subscriptionRegistry + "]";
}
@@ -337,7 +337,6 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
/* STOMP spec: receiver SHOULD take into account an error margin */
private static final long HEARTBEAT_MULTIPLIER = 3;
private final String sessiondId;
private final Principal user;
@@ -350,7 +349,6 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private volatile long lastWriteTime;
public SessionInfo(String sessiondId, Principal user, long[] clientHeartbeat, long[] serverHeartbeat) {
this.sessiondId = sessiondId;
this.user = user;
@@ -400,6 +398,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
}
}
private class HeartbeatTask implements Runnable {
@Override
@@ -420,4 +419,5 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
}
}
}
}

View File

@@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A {@link StompSession} that implements
* {@link org.springframework.messaging.tcp.TcpConnectionHandler

View File

@@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
package org.springframework.messaging.simp.stomp;
/**
* Raised when the connection for a STOMP session is lost rather than closed.
@@ -25,7 +25,6 @@ package org.springframework.messaging.simp.stomp;
@SuppressWarnings("serial")
public class ConnectionLostException extends Exception {
public ConnectionLostException(String message) {
super(message);
}

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.lang.reflect.Type;
@@ -47,7 +48,6 @@ import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;
/**
* Default implementation of {@link ConnectionHandlingStompSession}.
*
@@ -56,7 +56,7 @@ import org.springframework.util.concurrent.SettableListenableFuture;
*/
public class DefaultStompSession implements ConnectionHandlingStompSession {
private static Log logger = LogFactory.getLog(DefaultStompSession.class);
private static final Log logger = LogFactory.getLog(DefaultStompSession.class);
private static final IdGenerator idGenerator = new AlternativeJdkIdGenerator();
@@ -88,7 +88,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private long receiptTimeLimit = 15 * 1000;
private volatile boolean autoReceiptEnabled;
private volatile boolean autoReceiptEnabled;
private volatile TcpConnection<byte[]> connection;
@@ -107,7 +107,6 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
/**
* Create a new session.
*
* @param sessionHandler the application handler for the session
* @param connectHeaders headers for the STOMP CONNECT frame
*/
@@ -201,7 +200,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override
public boolean isConnected() {
return this.connection != null;
return (this.connection != null);
}
@Override
@@ -331,6 +330,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
}
// TcpConnectionHandler
@Override
@@ -475,7 +475,6 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
private class ReceiptHandler implements Receiptable {
private final String receiptId;
@@ -488,7 +487,6 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private Boolean result;
public ReceiptHandler(String receiptId) {
this.receiptId = receiptId;
if (this.receiptId != null) {
@@ -574,6 +572,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
}
private class DefaultSubscription extends ReceiptHandler implements Subscription {
private final String id;
@@ -582,7 +581,6 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private final StompFrameHandler handler;
public DefaultSubscription(String id, String destination, String receiptId, StompFrameHandler handler) {
super(receiptId);
Assert.notNull(destination, "'destination' is required");
@@ -620,6 +618,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
}
private class WriteInactivityTask implements Runnable {
@Override
@@ -638,6 +637,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
}
private class ReadInactivityTask implements Runnable {
@Override
@@ -652,4 +652,4 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
}
}
}

View File

@@ -13,12 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import reactor.Environment;
import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration;
@@ -27,9 +28,10 @@ import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec.TcpClientSpec;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A STOMP over TCP client that uses
@@ -73,8 +75,7 @@ public class Reactor2TcpStompClient extends StompClientSupport {
/**
* Connect and notify the given {@link StompSessionHandler} when connected
* on the STOMP level,
*
* on the STOMP level.
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
@@ -85,9 +86,8 @@ public class Reactor2TcpStompClient extends StompClientSupport {
/**
* An overloaded version of {@link #connect(StompSessionHandler)} that
* accepts headers to use for the STOMP CONNECT frame.
*
* @param connectHeaders headers to add to the CONNECT frame
* @param handler the handler for the STOMP session
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
public ListenableFuture<StompSession> connect(StompHeaders connectHeaders, StompSessionHandler handler) {
@@ -119,6 +119,7 @@ public class Reactor2TcpStompClient extends StompClientSupport {
}
}
private static class StompTcpClientSpecFactory
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.util.Arrays;
@@ -22,11 +23,10 @@ import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
/**
* Base class for STOMP client implementations.
*
* <p>Sub-classes can connect over WebSocket or TCP using any library.
* <p>Subclasses can connect over WebSocket or TCP using any library.
* When creating a new connection a sub-class can create an instance of
* {@link DefaultStompSession} which extends
* {@link org.springframework.messaging.tcp.TcpConnectionHandler
@@ -71,12 +71,10 @@ public abstract class StompClientSupport {
/**
* Configure a scheduler to use for heartbeats and for receipt tracking.
*
* <p><strong>Note:</strong> some transports have built-in support to work
* with heartbeats and therefore do not require a TaskScheduler.
* Receipts however, if needed, do require a TaskScheduler to be configured.
*
* <p>By default this is not set.
* <p>By default, this is not set.
*/
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
@@ -99,7 +97,7 @@ public abstract class StompClientSupport {
* TaskScheduler to be configured first.
* @param heartbeat the value for the CONNECT "heart-beat" header
* @see <a href="http://stomp.github.io/stomp-specification-1.2.html#Heart-beating">
* http://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>
* http://stomp.github.io/stomp-specification-1.2.html#Heart-beating</a>
*/
public void setDefaultHeartbeat(long[] heartbeat) {
Assert.notNull(heartbeat);
@@ -136,7 +134,7 @@ public abstract class StompClientSupport {
* Return the configured receipt time limit.
*/
public long getReceiptTimeLimit() {
return receiptTimeLimit;
return this.receiptTimeLimit;
}

View File

@@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.lang.reflect.Type;
/**
* Contract to handle a STOMP frame.
*
@@ -36,7 +36,6 @@ public interface StompFrameHandler {
/**
* Handle a STOMP frame with the payload converted to the target type returned
* from {@link #getPayloadType(StompHeaders)}.
*
* @param headers the headers of the frame
* @param payload the payload or {@code null} if there was no payload
*/

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.io.Serializable;
@@ -45,7 +46,7 @@ import org.springframework.util.StringUtils;
* @author Rossen Stoyanchev
* @since 4.2
* @see <a href="http://stomp.github.io/stomp-specification-1.2.html#Frames_and_Headers">
* http://stomp.github.io/stomp-specification-1.2.html#Frames_and_Headers</a>
* http://stomp.github.io/stomp-specification-1.2.html#Frames_and_Headers</a>
*/
public class StompHeaders implements MultiValueMap<String, String>, Serializable {

View File

@@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
package org.springframework.messaging.simp.stomp;
/**
* Represents a STOMP session with operations to send messages, create
@@ -41,7 +41,6 @@ public interface StompSession {
* the server to return a RECEIPT. An application can then use the
* {@link StompSession.Receiptable
* Receiptable} returned from the operation to track the receipt.
*
* <p>A receipt header can also be added manually through the overloaded
* methods that accept {@code StompHeaders}.
*/
@@ -117,7 +116,6 @@ public interface StompSession {
* @see org.springframework.messaging.simp.stomp.StompClientSupport#setReceiptTimeLimit(long)
*/
void addReceiptLostTask(Runnable runnable);
}
/**
@@ -134,7 +132,6 @@ public interface StompSession {
* Remove the subscription by sending an UNSUBSCRIBE frame.
*/
void unsubscribe();
}
}

View File

@@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
package org.springframework.messaging.simp.stomp;
/**
* A contract for client STOMP session lifecycle events including a callback

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.lang.reflect.Type;

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
import java.net.InetAddress;
@@ -66,6 +67,7 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
this.id = generateId();
}
private static String generateId() {
String host;
try {
@@ -171,7 +173,6 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
private long expirationTime;
public UserRegistryDto() {
}
@@ -233,6 +234,7 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
}
@SuppressWarnings("unused")
private static class SimpUserDto implements SimpUser {
@@ -240,7 +242,6 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
private Set<SimpSessionDto> sessions;
public SimpUserDto() {
this.sessions = new HashSet<SimpSessionDto>(1);
}
@@ -254,27 +255,18 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
}
@Override
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
@Override
public boolean hasSessions() {
return !this.sessions.isEmpty();
public String getName() {
return this.name;
}
@Override
public Set<SimpSession> getSessions() {
return new HashSet<SimpSession>(this.sessions);
}
public void setSessions(Set<SimpSessionDto> sessions) {
this.sessions.addAll(sessions);
public boolean hasSessions() {
return !this.sessions.isEmpty();
}
@Override
@@ -287,6 +279,15 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
return null;
}
public void setSessions(Set<SimpSessionDto> sessions) {
this.sessions.addAll(sessions);
}
@Override
public Set<SimpSession> getSessions() {
return new HashSet<SimpSession>(this.sessions);
}
private void restoreParentReferences() {
for (SimpSessionDto session : this.sessions) {
session.setUser(this);
@@ -296,13 +297,7 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || !(other instanceof SimpUser)) {
return false;
}
return this.name.equals(((SimpUser) other).getName());
return (this == other || (other instanceof SimpUser && this.name.equals(((SimpUser) other).getName())));
}
@Override
@@ -316,6 +311,7 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
}
@SuppressWarnings("unused")
private static class SimpSessionDto implements SimpSession {
@@ -323,8 +319,7 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
private SimpUserDto user;
private Set<SimpSubscriptionDto> subscriptions;
private final Set<SimpSubscriptionDto> subscriptions;
public SimpSessionDto() {
this.subscriptions = new HashSet<SimpSubscriptionDto>(4);
@@ -339,18 +334,13 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
}
@Override
public String getId() {
return this.id;
}
public void setId(String id) {
this.id = id;
}
@Override
public SimpUserDto getUser() {
return this.user;
public String getId() {
return this.id;
}
public void setUser(SimpUserDto user) {
@@ -358,14 +348,19 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
@Override
public Set<SimpSubscription> getSubscriptions() {
return new HashSet<SimpSubscription>(this.subscriptions);
public SimpUserDto getUser() {
return this.user;
}
public void setSubscriptions(Set<SimpSubscriptionDto> subscriptions) {
this.subscriptions.addAll(subscriptions);
}
@Override
public Set<SimpSubscription> getSubscriptions() {
return new HashSet<SimpSubscription>(this.subscriptions);
}
private void restoreParentReferences() {
for (SimpSubscriptionDto subscription : this.subscriptions) {
subscription.setSession(this);
@@ -373,19 +368,13 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
@Override
public int hashCode() {
return this.id.hashCode();
public boolean equals(Object other) {
return (this == other || (other instanceof SimpSession && this.id.equals(((SimpSession) other).getId())));
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || !(other instanceof SimpSession)) {
return false;
}
return this.id.equals(((SimpSession) other).getId());
public int hashCode() {
return this.id.hashCode();
}
@Override
@@ -394,6 +383,7 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
}
@SuppressWarnings("unused")
private static class SimpSubscriptionDto implements SimpSubscription {
@@ -403,7 +393,6 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
private String destination;
public SimpSubscriptionDto() {
}
@@ -412,18 +401,13 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
this.destination = subscription.getDestination();
}
@Override
public String getId() {
return this.id;
}
public void setId(String id) {
this.id = id;
}
@Override
public SimpSessionDto getSession() {
return this.session;
public String getId() {
return this.id;
}
public void setSession(SimpSessionDto session) {
@@ -431,8 +415,8 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
@Override
public String getDestination() {
return this.destination;
public SimpSessionDto getSession() {
return this.session;
}
public void setDestination(String destination) {
@@ -440,8 +424,8 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
}
@Override
public int hashCode() {
return 31 * this.id.hashCode() + ObjectUtils.nullSafeHashCode(getSession());
public String getDestination() {
return this.destination;
}
@Override
@@ -449,7 +433,7 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
if (this == other) {
return true;
}
if (other == null || !(other instanceof SimpSubscription)) {
if (!(other instanceof SimpSubscription)) {
return false;
}
SimpSubscription otherSubscription = (SimpSubscription) other;
@@ -457,12 +441,18 @@ public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicati
this.id.equals(otherSubscription.getId()));
}
@Override
public int hashCode() {
return this.id.hashCode() * 31 + ObjectUtils.nullSafeHashCode(getSession());
}
@Override
public String toString() {
return "destination=" + this.destination;
}
}
private static class NoOpSmartApplicationListener implements SmartApplicationListener {
@Override

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
import java.util.Set;

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
/**

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
/**
@@ -26,7 +27,7 @@ public interface SimpSubscriptionMatcher {
/**
* Match the given subscription.
* @param subscription the subscription to match
* @return {@code true} in case of match, {@code false} otherwise.
* @return {@code true} in case of a match, {@code false} otherwise
*/
boolean match(SimpSubscription subscription);

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
import java.util.Set;

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
import java.util.Set;

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
import java.util.concurrent.ScheduledFuture;
@@ -64,7 +65,6 @@ public class UserRegistryMessageHandler implements MessageHandler, ApplicationLi
Assert.hasText(broadcastDestination, "'broadcastDestination' is required");
Assert.notNull(scheduler, "'scheduler' is required");
this.userRegistry = (MultiServerUserRegistry) userRegistry;
this.brokerTemplate = brokerTemplate;
this.broadcastDestination = broadcastDestination;

View File

@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.user;
import java.util.Collections;
@@ -48,12 +49,12 @@ public class UserSessionRegistryAdapter implements SimpUserRegistry {
@Override
public Set<SimpUser> getUsers() {
throw new UnsupportedOperationException("UserSessionRegistry does not expose a listing of users.");
throw new UnsupportedOperationException("UserSessionRegistry does not expose a listing of users");
}
@Override
public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) {
throw new UnsupportedOperationException("UserSessionRegistry does not support operations across users.");
throw new UnsupportedOperationException("UserSessionRegistry does not support operations across users");
}
@@ -63,7 +64,6 @@ public class UserSessionRegistryAdapter implements SimpUserRegistry {
private final Map<String, SimpSession> sessions;
public SimpleSimpUser(String name, Set<String> sessionIds) {
this.name = name;
this.sessions = new HashMap<String, SimpSession>(sessionIds.size());
@@ -93,11 +93,11 @@ public class UserSessionRegistryAdapter implements SimpUserRegistry {
}
}
private static class SimpleSimpSession implements SimpSession {
private final String id;
public SimpleSimpSession(String id) {
this.id = id;
}

View File

@@ -59,11 +59,10 @@ import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the TCP client support of the Reactor project.
* <p>
*
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
* i.e. a separate (Reactor) client instance for each connection.
*
@@ -76,13 +75,14 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@SuppressWarnings("rawtypes")
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private final NioEventLoopGroup eventLoopGroup;
private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
new ArrayList<TcpClient<Message<P>, Message<P>>>();
private final NioEventLoopGroup eventLoopGroup;
private boolean stopping;
@@ -94,17 +94,14 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
* threads will be shared amongst the active clients.
* <p>Also see the constructor accepting a ready Reactor
* {@link TcpClientSpec} {@link Function} factory.
*
* @param host the host to connect to
* @param port the port to connect to
* @param host the host to connect to
* @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream
*/
public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
this.eventLoopGroup = initEventLoopGroup();
this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() {
@Override
public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
return spec
@@ -116,12 +113,28 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
};
}
/**
* A constructor with a pre-configured {@link TcpClientSpec} {@link Function}
* factory. This might be used to add SSL or specific network parameters to
* the generated client configuration.
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for cleaning them, e.g. using
* {@link reactor.core.Dispatcher#shutdown}.
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation
*/
public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
this.tcpClientSpecFactory = tcpClientSpecFactory;
this.eventLoopGroup = null;
}
private static NioEventLoopGroup initEventLoopGroup() {
int ioThreadCount;
try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
}
catch (Exception i) {
catch (Exception ex) {
ioThreadCount = -1;
}
if (ioThreadCount <= 0l) {
@@ -132,26 +145,10 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
new NamedDaemonThreadFactory("reactor-tcp-io"));
}
/**
* A constructor with a pre-configured {@link TcpClientSpec} {@link Function}
* factory. This might be used to add SSL or specific network parameters to
* the generated client configuration.
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for cleaning them, e.g. using
* {@link reactor.core.Dispatcher#shutdown}.
*
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation.
*/
public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
this.tcpClientSpecFactory = tcpClientSpecFactory;
this.eventLoopGroup = null;
}
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
Assert.notNull(connectionHandler, "'connectionHandler' must not be null");
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
TcpClient<Message<P>, Message<P>> tcpClient;
synchronized (this.tcpClients) {
@@ -178,8 +175,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
Assert.notNull(connectionHandler, "'connectionHandler' must not be null");
Assert.notNull(strategy, "'reconnectStrategy' must not be null");
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
Assert.notNull(strategy, "ReconnectStrategy must not be null");
TcpClient<Message<P>, Message<P>> tcpClient;
synchronized (this.tcpClients) {
@@ -204,6 +201,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
synchronized (this.tcpClients) {
this.stopping = true;
}
Promise<Void> promise = Streams.from(this.tcpClients)
.flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Void>>() {
@Override
@@ -217,6 +215,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
})
.next();
if (this.eventLoopGroup != null) {
final Promise<Void> eventLoopPromise = Promises.prepare();
promise.onComplete(new Consumer<Promise<Void>>() {
@@ -249,6 +248,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
}
private static class MessageChannelStreamHandler<P>
implements ReactorChannelHandler<Message<P>, Message<P>, ChannelStream<Message<P>, Message<P>>> {
@@ -260,14 +260,10 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@Override
public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
Promise<Void> closePromise = Promises.prepare();
this.connectionHandler.afterConnected(new Reactor2TcpConnection<P>(channelStream, closePromise));
channelStream
.finallyDo(new Consumer<Signal<Message<P>>>() {
@Override
public void accept(Signal<Message<P>> signal) {
if (signal.isOnError()) {
@@ -279,7 +275,6 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
})
.consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
@@ -290,6 +285,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
}
private static class ReactorReconnectAdapter implements Reconnect {
private final ReconnectStrategy strategy;
@@ -300,7 +296,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, strategy.getTimeToNextAttempt(attempt));
return Tuple.of(address, this.strategy.getTimeToNextAttempt(attempt));
}
}