GH-3526: Fix Infinite Loop in FailoverCConnFactory

Resolves https://github.com/spring-projects/spring-integration/issues/3526

`FailoverClientConnectionFactory`

The logic to detect we had iterated over all factories and including
the one from which the previous connection was established was incorrect,
causing an infite loop until one of the factory connections was successful.

Change the logic to detect we have reset the iterator and the current failure
is from the same factory as the one from which the previous connection was
established.

**cherry-pick to 5.4.x, 5.3.x**

* Add diagnostics.

* Fix race in test.

* More race fixes and diagnostics.

* Remove diagnostics.
This commit is contained in:
Gary Russell
2021-03-25 17:46:54 -04:00
parent ad27d5ab4e
commit 98edb9d24d
3 changed files with 70 additions and 18 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -257,7 +257,7 @@ public class FailoverClientConnectionFactory extends AbstractClientConnectionFac
if (!this.factoryIterator.hasNext()) {
this.factoryIterator = this.connectionFactories.iterator();
}
boolean retried = false;
boolean restartedList = false;
while (!success) {
try {
nextFactory = this.factoryIterator.next();
@@ -275,17 +275,18 @@ public class FailoverClientConnectionFactory extends AbstractClientConnectionFac
+ e.toString()
+ ", trying another");
}
if (restartedList && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) {
logger.debug("Failover failed to find a connection");
/*
* We've tried every factory including the
* one the current connection was on.
*/
this.open = false;
throw e;
}
if (!this.factoryIterator.hasNext()) {
if (retried && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) {
/*
* We've tried every factory including the
* one the current connection was on.
*/
this.open = false;
throw e;
}
this.factoryIterator = this.connectionFactories.iterator();
retried = true;
restartedList = true;
}
}
}

View File

@@ -277,10 +277,14 @@ public abstract class TcpConnectionSupport implements TcpConnection {
@Override
public TcpListener getListener() {
if (this.manualListenerRegistration) {
if (this.logger.isDebugEnabled()) {
boolean debugEnabled = this.logger.isDebugEnabled();
if (debugEnabled) {
this.logger.debug(getConnectionId() + " Waiting for listener registration");
}
waitForListenerRegistration();
if (debugEnabled) {
this.logger.debug(getConnectionId() + " Listener registered");
}
}
return this.listener;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,8 +28,11 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
@@ -41,7 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import javax.net.ServerSocketFactory;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
@@ -62,6 +67,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author Gary Russell
@@ -173,7 +179,7 @@ public class FailoverClientConnectionFactoryTests {
inOrder.verifyNoMoreInteractions();
}
@Test(expected = UncheckedIOException.class)
@Test
public void testFailoverAllDead() throws Exception {
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
@@ -193,10 +199,50 @@ public class FailoverClientConnectionFactoryTests {
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
failoverFactory.start();
GenericMessage<String> message = new GenericMessage<String>("foo");
failoverFactory.getConnection().send(message);
assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() ->
failoverFactory.getConnection().send(message));
Mockito.verify(conn2).send(message);
}
@Test
void failoverAllDeadAfterSuccess() throws Exception {
ServerSocket ss1 = ServerSocketFactory.getDefault().createServerSocket(0);
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.initialize();
exec.submit(() -> {
Socket accepted = ss1.accept();
BufferedReader br = new BufferedReader(new InputStreamReader(accepted.getInputStream()));
br.readLine();
accepted.getOutputStream().write("ok\r\n".getBytes());
accepted.close();
ss1.close();
return true;
});
TcpNetClientConnectionFactory cf1 = new TcpNetClientConnectionFactory("localhost", ss1.getLocalPort());
AbstractClientConnectionFactory cf2 = mock(AbstractClientConnectionFactory.class);
List<AbstractClientConnectionFactory> factories = new ArrayList<AbstractClientConnectionFactory>();
factories.add(cf1);
factories.add(cf2);
doThrow(new UncheckedIOException(new IOException("fail"))).when(cf2).getConnection();
CountDownLatch latch = new CountDownLatch(2);
cf1.setApplicationEventPublisher(event -> {
if (event instanceof TcpConnectionCloseEvent) {
latch.countDown();
}
});
cf2.setApplicationEventPublisher(event -> { });
FailoverClientConnectionFactory fccf = new FailoverClientConnectionFactory(factories);
fccf.registerListener(msf -> {
latch.countDown();
return false;
});
fccf.start();
fccf.getConnection().send(new GenericMessage<>("test"));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() ->
fccf.getConnection().send(new GenericMessage<>("test")));
}
@Test
public void testFailoverAllDeadButOriginalOkAgain() throws Exception {
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
@@ -228,7 +274,7 @@ public class FailoverClientConnectionFactoryTests {
Mockito.verify(conn1, times(2)).send(message);
}
@Test(expected = UncheckedIOException.class)
@Test
public void testFailoverConnectNone() throws Exception {
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
@@ -242,7 +288,8 @@ public class FailoverClientConnectionFactoryTests {
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
failoverFactory.start();
GenericMessage<String> message = new GenericMessage<String>("foo");
failoverFactory.getConnection().send(message);
assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() ->
failoverFactory.getConnection().send(message));
}
@Test