Fix spelling in SynchronizingMessageListener.SubscriptionSynchronizion.
Additionally, cleanup compiler warnings. Closes #2656 Original pull request: #2657
This commit is contained in:
@@ -17,8 +17,6 @@ package org.springframework.data.redis.connection;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
/**
|
||||
* Class encapsulating a Redis message body and its properties.
|
||||
*
|
||||
|
||||
@@ -671,7 +671,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
if (wasListening) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
||||
getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns,
|
||||
getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns,
|
||||
channels, () -> future.complete(null)));
|
||||
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
|
||||
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));
|
||||
@@ -1212,7 +1212,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
|
||||
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
|
||||
|
||||
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
|
||||
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
|
||||
() -> subscriptionDone.complete(null)));
|
||||
|
||||
doSubscribe(connection, patterns, channels);
|
||||
@@ -1240,7 +1240,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
}
|
||||
}
|
||||
|
||||
void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronizion synchronizer) {
|
||||
void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronization synchronizer) {
|
||||
this.synchronizingMessageListener.addSynchronization(synchronizer);
|
||||
}
|
||||
|
||||
@@ -1413,7 +1413,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
initiallySubscribeToChannels = Collections.emptySet();
|
||||
// perform channel subscription later as the first call to (p)subscribe blocks the client
|
||||
addSynchronization(
|
||||
new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, Collections.emptySet(), () -> {
|
||||
new SynchronizingMessageListener.SubscriptionSynchronization(patterns, Collections.emptySet(), () -> {
|
||||
try {
|
||||
subscribeChannel(channels.toArray(new byte[0][]));
|
||||
} catch (Exception e) {
|
||||
@@ -1424,7 +1424,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
|
||||
initiallySubscribeToChannels = channels;
|
||||
}
|
||||
|
||||
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronizion(patterns, channels,
|
||||
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
|
||||
() -> subscriptionDone.complete(null)));
|
||||
|
||||
executor.execute(() -> {
|
||||
|
||||
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
@@ -34,7 +33,7 @@ import org.springframework.lang.Nullable;
|
||||
|
||||
/**
|
||||
* Synchronizing {@link MessageListener} and {@link SubscriptionListener} that allows notifying a {@link Runnable}
|
||||
* (through {@link SubscriptionSynchronizion}) upon completing subscriptions to channels or patterns.
|
||||
* (through {@link SubscriptionSynchronization}) upon completing subscriptions to channels or patterns.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @since 3.0
|
||||
@@ -43,7 +42,7 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
|
||||
private final MessageListener messageListener;
|
||||
private final SubscriptionListener subscriptionListener;
|
||||
private final List<SubscriptionSynchronizion> synchronizations = new CopyOnWriteArrayList<>();
|
||||
private final List<SubscriptionSynchronization> synchronizations = new CopyOnWriteArrayList<>();
|
||||
|
||||
public SynchronizingMessageListener(MessageListener messageListener, SubscriptionListener subscriptionListener) {
|
||||
this.messageListener = messageListener;
|
||||
@@ -51,11 +50,11 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a {@link SubscriptionSynchronizion}.
|
||||
* Register a {@link SubscriptionSynchronization}.
|
||||
*
|
||||
* @param synchronization must not be {@literal null}.
|
||||
*/
|
||||
public void addSynchronization(SubscriptionSynchronizion synchronization) {
|
||||
public void addSynchronization(SubscriptionSynchronization synchronization) {
|
||||
this.synchronizations.add(synchronization);
|
||||
}
|
||||
|
||||
@@ -68,7 +67,7 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
public void onChannelSubscribed(byte[] channel, long count) {
|
||||
|
||||
subscriptionListener.onChannelSubscribed(channel, count);
|
||||
handleSubscription(channel, SubscriptionSynchronizion::onChannelSubscribed);
|
||||
handleSubscription(channel, SubscriptionSynchronization::onChannelSubscribed);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -80,7 +79,7 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
public void onPatternSubscribed(byte[] pattern, long count) {
|
||||
|
||||
subscriptionListener.onPatternSubscribed(pattern, count);
|
||||
handleSubscription(pattern, SubscriptionSynchronizion::onPatternSubscribed);
|
||||
handleSubscription(pattern, SubscriptionSynchronization::onPatternSubscribed);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -89,16 +88,16 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
}
|
||||
|
||||
void handleSubscription(byte[] topic,
|
||||
BiFunction<SubscriptionSynchronizion, ByteArrayWrapper, Boolean> synchronizerCallback) {
|
||||
BiFunction<SubscriptionSynchronization, ByteArrayWrapper, Boolean> synchronizerCallback) {
|
||||
|
||||
if (synchronizations.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ByteArrayWrapper binaryChannel = new ByteArrayWrapper(topic);
|
||||
List<SubscriptionSynchronizion> finalized = new ArrayList<>(synchronizations.size());
|
||||
List<SubscriptionSynchronization> finalized = new ArrayList<>(synchronizations.size());
|
||||
|
||||
for (SubscriptionSynchronizion synchronizer : synchronizations) {
|
||||
for (SubscriptionSynchronization synchronizer : synchronizations) {
|
||||
|
||||
if (synchronizerCallback.apply(synchronizer, binaryChannel)) {
|
||||
finalized.add(synchronizer);
|
||||
@@ -111,21 +110,22 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
/**
|
||||
* Synchronization to await subscriptions for channels and patterns.
|
||||
*/
|
||||
static class SubscriptionSynchronizion {
|
||||
static class SubscriptionSynchronization {
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<SubscriptionSynchronizion> DONE = AtomicIntegerFieldUpdater
|
||||
.newUpdater(SubscriptionSynchronizion.class, "done");
|
||||
private static final AtomicIntegerFieldUpdater<SubscriptionSynchronization> DONE = AtomicIntegerFieldUpdater
|
||||
.newUpdater(SubscriptionSynchronization.class, "done");
|
||||
|
||||
private static final int NOT_DONE = 0;
|
||||
private static final int DONE_DONE = 0;
|
||||
|
||||
private volatile int done = NOT_DONE;
|
||||
private final Set<ByteArrayWrapper> remainingPatterns;
|
||||
private final Set<ByteArrayWrapper> remainingChannels;
|
||||
|
||||
private final Runnable doneCallback;
|
||||
|
||||
public SubscriptionSynchronizion(Collection<byte[]> remainingPatterns, Collection<byte[]> remainingChannels,
|
||||
private final Set<ByteArrayWrapper> remainingPatterns;
|
||||
private final Set<ByteArrayWrapper> remainingChannels;
|
||||
|
||||
public SubscriptionSynchronization(Collection<byte[]> remainingPatterns, Collection<byte[]> remainingChannels,
|
||||
Runnable doneCallback) {
|
||||
|
||||
if (remainingPatterns.isEmpty()) {
|
||||
@@ -133,7 +133,7 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
} else {
|
||||
this.remainingPatterns = ConcurrentHashMap.newKeySet(remainingPatterns.size());
|
||||
this.remainingPatterns
|
||||
.addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
|
||||
.addAll(remainingPatterns.stream().map(ByteArrayWrapper::new).toList());
|
||||
}
|
||||
|
||||
if (remainingChannels.isEmpty()) {
|
||||
@@ -141,7 +141,7 @@ class SynchronizingMessageListener implements MessageListener, SubscriptionListe
|
||||
} else {
|
||||
this.remainingChannels = ConcurrentHashMap.newKeySet(remainingChannels.size());
|
||||
this.remainingChannels
|
||||
.addAll(remainingChannels.stream().map(ByteArrayWrapper::new).collect(Collectors.toList()));
|
||||
.addAll(remainingChannels.stream().map(ByteArrayWrapper::new).toList());
|
||||
}
|
||||
|
||||
this.doneCallback = doneCallback;
|
||||
|
||||
Reference in New Issue
Block a user