Differentiate between initial exception handling, recovery and recovery after subscription.

We now differentiate exception handling regarding the recovery state. Initial listen fails if the connection is unavailable. Upon recovery after a preceeding subscription we now log the success to create a counterpart to our error logging.

Closes: #2782
Original Pull Request: #2808
This commit is contained in:
Mark Paluch
2023-12-14 14:00:11 +01:00
committed by Christoph Strobl
parent b36ac3ea2a
commit 13f87a2332
2 changed files with 147 additions and 15 deletions

View File

@@ -370,7 +370,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
State state = this.state.get();
CompletableFuture<Void> futureToAwait = state.isPrepareListening() ? containerListenFuture
: lazyListen(this.backOff.start());
: lazyListen(new InitialBackoffExecution(this.backOff.start()));
try {
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
@@ -531,8 +531,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException | TimeoutException ignore) {
}
} catch (ExecutionException | TimeoutException ignore) {}
}
@Override
@@ -876,7 +875,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
if (recoveryInterval != BackOffExecution.STOP) {
String message = String.format("Connection failure occurred: %s; Restarting subscription task after %s ms",
cause, recoveryInterval);
cause, recoveryInterval);
logger.error(message, cause);
}
@@ -885,8 +884,13 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
Runnable recoveryFunction = () -> {
CompletableFuture<Void> lazyListen = lazyListen(backOffExecution);
lazyListen.whenComplete(propagate(future));
CompletableFuture<Void> lazyListen = lazyListen(new RecoveryBackoffExecution(backOffExecution));
lazyListen.whenComplete(propagate(future)).thenRun(() -> {
if (backOffExecution instanceof RecoveryAfterSubscriptionBackoffExecution) {
logger.info("Subscription(s) recovered");
}
});
};
if (potentiallyRecover(loggingBackOffExecution, recoveryFunction)) {
@@ -980,7 +984,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
private Subscriber getRequiredSubscriber() {
Assert.state(this.subscriber != null,
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber. Make sure that afterPropertiesSet() has been called");
return this.subscriber;
}
@@ -1018,6 +1022,54 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
}
BackOffExecution nextBackoffExecution(BackOffExecution backOffExecution, boolean subscribed) {
if (subscribed) {
return new RecoveryAfterSubscriptionBackoffExecution(backOff.start());
}
return backOffExecution;
}
/**
* Marker for an initial backoff.
*
* @param delegate
*/
record InitialBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
@Override
public long nextBackOff() {
return delegate.nextBackOff();
}
}
/**
* Marker for a recovery after a subscription has been active previously.
*
* @param delegate
*/
record RecoveryAfterSubscriptionBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
@Override
public long nextBackOff() {
return delegate.nextBackOff();
}
}
/**
* Marker for a recovery execution.
*
* @param delegate
*/
record RecoveryBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
@Override
public long nextBackOff() {
return delegate.nextBackOff();
}
}
/**
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
* {@code channel or pattern}, and {@code count} and returns no result.
@@ -1191,7 +1243,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
if (connection.isSubscribed()) {
initFuture.completeExceptionally(
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
return initFuture;
}
@@ -1199,10 +1251,15 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
try {
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
} catch (Throwable t) {
handleSubscriptionException(initFuture, backOffExecution, t);
handleSubscriptionException(initFuture, nextBackoffExecution(backOffExecution, connection.isSubscribed()),
t);
}
} catch (RuntimeException ex) {
initFuture.completeExceptionally(ex);
if (backOffExecution instanceof InitialBackoffExecution) {
initFuture.completeExceptionally(ex);
} else {
handleSubscriptionException(initFuture, backOffExecution, ex);
}
}
return initFuture;
@@ -1215,8 +1272,9 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
() -> subscriptionDone.complete(null)));
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> {
subscriptionDone.complete(null);
}));
doSubscribe(connection, patterns, channels);
}
@@ -1381,7 +1439,10 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
}
private void doInLock(Runnable runner) {
doInLock(() -> { runner.run(); return null; });
doInLock(() -> {
runner.run();
return null;
});
}
private <T> T doInLock(Supplier<T> supplier) {
@@ -1432,7 +1493,7 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
try {
subscribeChannel(channels.toArray(new byte[0][]));
} catch (Exception ex) {
handleSubscriptionException(subscriptionDone, backOffExecution, ex);
handleSubscriptionException(subscriptionDone, nextBackoffExecution(backOffExecution, true), ex);
}
}));
} else {
@@ -1449,7 +1510,8 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab
closeConnection();
unsubscribeFuture.complete(null);
} catch (Throwable cause) {
handleSubscriptionException(subscriptionDone, backOffExecution, cause);
handleSubscriptionException(subscriptionDone,
nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause);
}
});
}

View File

@@ -19,10 +19,15 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.RedisConnection;
@@ -32,6 +37,7 @@ import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
import org.springframework.util.backoff.FixedBackOff;
/**
* Unit tests for {@link RedisMessageListenerContainer}.
@@ -147,6 +153,70 @@ class RedisMessageListenerContainerUnitTests {
assertThat(container.isListening()).isFalse();
}
@Test // GH-2335
void shouldRecoverFromConnectionFailure() throws Exception {
AtomicInteger requestCount = new AtomicInteger();
AtomicBoolean shouldThrowSubscriptionException = new AtomicBoolean();
container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactoryMock);
container.setBeanName("container");
container.setTaskExecutor(new SyncTaskExecutor());
container.setSubscriptionExecutor(new SimpleAsyncTaskExecutor());
container.setMaxSubscriptionRegistrationWaitingTime(1000);
container.setRecoveryBackoff(new FixedBackOff(1, 5));
container.afterPropertiesSet();
doAnswer(it -> {
int req = requestCount.incrementAndGet();
if (req == 1 || req == 3) {
return connectionMock;
}
throw new RedisConnectionFailureException("Booh");
}).when(connectionFactoryMock).getConnection();
CountDownLatch exceptionWait = new CountDownLatch(1);
CountDownLatch armed = new CountDownLatch(1);
CountDownLatch recoveryArmed = new CountDownLatch(1);
doAnswer(it -> {
SubscriptionListener listener = it.getArgument(0);
when(connectionMock.isSubscribed()).thenReturn(true);
listener.onChannelSubscribed("a".getBytes(StandardCharsets.UTF_8), 1);
armed.countDown();
exceptionWait.await();
if (shouldThrowSubscriptionException.compareAndSet(true, false)) {
when(connectionMock.isSubscribed()).thenReturn(false);
throw new RedisConnectionFailureException("Disconnected");
}
recoveryArmed.countDown();
return null;
}).when(connectionMock).subscribe(any(), any());
container.start();
container.addMessageListener(new MessageListenerAdapter(handler), new ChannelTopic("a"));
armed.await();
// let an exception happen
shouldThrowSubscriptionException.set(true);
exceptionWait.countDown();
// wait for subscription recovery
recoveryArmed.await();
assertThat(recoveryArmed.getCount()).isZero();
}
@Test // GH-964
void failsOnDuplicateInit() {
assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet());