Polishing

This commit is contained in:
Juergen Hoeller
2017-02-21 22:41:40 +01:00
parent f219680d42
commit d2cc97af47
16 changed files with 83 additions and 80 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,8 +38,7 @@ import org.springframework.util.MultiValueMap;
public abstract class AbstractSubscriptionRegistry implements SubscriptionRegistry {
private static MultiValueMap<String, String> EMPTY_MAP =
CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<String, String>(0));
CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<>(0));
protected final Log logger = LogFactory.getLog(getClass());
@@ -55,19 +54,25 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
if (sessionId == null) {
logger.error("No sessionId in " + message);
if (logger.isErrorEnabled()) {
logger.error("No sessionId in " + message);
}
return;
}
String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
if (subscriptionId == null) {
logger.error("No subscriptionId in " + message);
if (logger.isErrorEnabled()) {
logger.error("No subscriptionId in " + message);
}
return;
}
String destination = SimpMessageHeaderAccessor.getDestination(headers);
if (destination == null) {
logger.error("No destination in " + message);
if (logger.isErrorEnabled()) {
logger.error("No destination in " + message);
}
return;
}
@@ -85,13 +90,17 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
if (sessionId == null) {
logger.error("No sessionId in " + message);
if (logger.isErrorEnabled()) {
logger.error("No sessionId in " + message);
}
return;
}
String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
if (subscriptionId == null) {
logger.error("No subscriptionId " + message);
if (logger.isErrorEnabled()) {
logger.error("No subscriptionId " + message);
}
return;
}
@@ -109,7 +118,9 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
String destination = SimpMessageHeaderAccessor.getDestination(headers);
if (destination == null) {
logger.error("No destination in " + message);
if (logger.isErrorEnabled()) {
logger.error("No destination in " + message);
}
return EMPTY_MAP;
}
@@ -117,14 +128,13 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
}
protected abstract void addSubscriptionInternal(String sessionId, String subscriptionId,
protected abstract void addSubscriptionInternal(
String sessionId, String subscriptionId, String destination, Message<?> message);
protected abstract void removeSubscriptionInternal(
String sessionId, String subscriptionId, Message<?> message);
protected abstract MultiValueMap<String, String> findSubscriptionsInternal(
String destination, Message<?> message);
protected abstract void removeSubscriptionInternal(String sessionId, String subscriptionId, Message<?> message);
@Override
public abstract void unregisterAllSubscriptions(String sessionId);
protected abstract MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message);
}

View File

@@ -136,8 +136,8 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
protected void addSubscriptionInternal(String sessionId, String subsId, String destination,
Message<?> message) {
protected void addSubscriptionInternal(
String sessionId, String subsId, String destination, Message<?> message) {
Expression expression = null;
MessageHeaders headers = message.getHeaders();
@@ -478,13 +478,14 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
private static class Subscription {
private static final class Subscription {
private final String id;
private final Expression selectorExpression;
public Subscription(String id, Expression selector) {
Assert.notNull(id, "Subscription id must not be null");
this.id = id;
this.selectorExpression = selector;
}
@@ -499,19 +500,12 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
return getId().equals(((Subscription) other).getId());
return (this == other || (other instanceof Subscription && this.id.equals(((Subscription) other).id)));
}
@Override
public int hashCode() {
return getId().hashCode();
return this.id.hashCode();
}
@Override

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
package org.springframework.messaging.tcp;
/**
* A contract to determine the frequency of reconnect attempts after connection failure.
*

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.nio.ByteBuffer;
@@ -41,13 +42,14 @@ public abstract class AbstractNioBufferReactorNettyCodec<P> implements ReactorNe
return messages;
}
protected abstract List<Message<P>> decodeInternal(ByteBuffer nioBuffer);
@Override
public void encode(Message<P> message, ByteBuf outputBuffer) {
outputBuffer.writeBytes(encodeInternal(message));
}
protected abstract List<Message<P>> decodeInternal(ByteBuffer nioBuffer);
protected abstract ByteBuffer encodeInternal(Message<P> message);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,12 +28,10 @@ import reactor.core.publisher.Mono;
*/
class MonoToListenableFutureAdapter<T> extends AbstractMonoToListenableFutureAdapter<T, T> {
public MonoToListenableFutureAdapter(Mono<T> mono) {
super(mono);
}
@Override
protected T adapt(T result) {
return result;

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -115,6 +115,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
@@ -131,6 +132,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
@@ -189,7 +191,6 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
ChannelGroupFuture close = this.channelGroup.close();
Mono<Void> completion = FutureMono.from(close)
.doAfterTerminate((x, e) -> {
// TODO: https://github.com/reactor/reactor-netty/issues/24
shutdownGlobalResources();
@@ -211,14 +212,14 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
return new MonoToListenableFutureAdapter<>(completion);
}
private static void shutdownGlobalResources() {
private void shutdownGlobalResources() {
try {
Method method = TcpResources.class.getDeclaredMethod("_dispose");
ReflectionUtils.makeAccessible(method);
ReflectionUtils.invokeMethod(method, TcpResources.get());
}
catch (NoSuchMethodException ex) {
ex.printStackTrace();
// ignore
}
}
@@ -227,15 +228,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final TcpConnectionHandler<P> connectionHandler;
ReactorNettyHandler(TcpConnectionHandler<P> handler) {
this.connectionHandler = handler;
}
@SuppressWarnings("unchecked")
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
DirectProcessor<Void> completion = DirectProcessor.create();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
@@ -254,6 +253,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
}
private static class StompMessageDecoder<P> extends ByteToMessageDecoder {
private final ReactorNettyCodec<P> codec;

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -66,7 +66,6 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
@Override
@SuppressWarnings("deprecation")
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
// TODO: workaround for https://github.com/reactor/reactor-netty/issues/22
ChannelPipeline pipeline = this.inbound.context().channel().pipeline();
String name = NettyPipeline.OnChannelReadIdle;