From 98edb9d24d53d7f0e645af47a346de5ee509482d Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 25 Mar 2021 17:46:54 -0400 Subject: [PATCH] GH-3526: Fix Infinite Loop in FailoverCConnFactory Resolves https://github.com/spring-projects/spring-integration/issues/3526 `FailoverClientConnectionFactory` The logic to detect we had iterated over all factories and including the one from which the previous connection was established was incorrect, causing an infite loop until one of the factory connections was successful. Change the logic to detect we have reset the iterator and the current failure is from the same factory as the one from which the previous connection was established. **cherry-pick to 5.4.x, 5.3.x** * Add diagnostics. * Fix race in test. * More race fixes and diagnostics. * Remove diagnostics. --- .../FailoverClientConnectionFactory.java | 23 ++++---- .../tcp/connection/TcpConnectionSupport.java | 6 +- .../FailoverClientConnectionFactoryTests.java | 59 +++++++++++++++++-- 3 files changed, 70 insertions(+), 18 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java index edef76d76d..b64fd8cb73 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -257,7 +257,7 @@ public class FailoverClientConnectionFactory extends AbstractClientConnectionFac if (!this.factoryIterator.hasNext()) { this.factoryIterator = this.connectionFactories.iterator(); } - boolean retried = false; + boolean restartedList = false; while (!success) { try { nextFactory = this.factoryIterator.next(); @@ -275,17 +275,18 @@ public class FailoverClientConnectionFactory extends AbstractClientConnectionFac + e.toString() + ", trying another"); } + if (restartedList && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) { + logger.debug("Failover failed to find a connection"); + /* + * We've tried every factory including the + * one the current connection was on. + */ + this.open = false; + throw e; + } if (!this.factoryIterator.hasNext()) { - if (retried && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) { - /* - * We've tried every factory including the - * one the current connection was on. - */ - this.open = false; - throw e; - } this.factoryIterator = this.connectionFactories.iterator(); - retried = true; + restartedList = true; } } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java index 5b7f8ed2d1..4b75228ed4 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java @@ -277,10 +277,14 @@ public abstract class TcpConnectionSupport implements TcpConnection { @Override public TcpListener getListener() { if (this.manualListenerRegistration) { - if (this.logger.isDebugEnabled()) { + boolean debugEnabled = this.logger.isDebugEnabled(); + if (debugEnabled) { this.logger.debug(getConnectionId() + " Waiting for listener registration"); } waitForListenerRegistration(); + if (debugEnabled) { + this.logger.debug(getConnectionId() + " Listener registered"); + } } return this.listener; } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java index 3dab3cd438..6d9fd801e2 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,11 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.UncheckedIOException; +import java.net.ServerSocket; import java.net.Socket; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -41,7 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; +import javax.net.ServerSocketFactory; + +import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.mockito.Mockito; @@ -62,6 +67,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * @author Gary Russell @@ -173,7 +179,7 @@ public class FailoverClientConnectionFactoryTests { inOrder.verifyNoMoreInteractions(); } - @Test(expected = UncheckedIOException.class) + @Test public void testFailoverAllDead() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class); @@ -193,10 +199,50 @@ public class FailoverClientConnectionFactoryTests { FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories); failoverFactory.start(); GenericMessage message = new GenericMessage("foo"); - failoverFactory.getConnection().send(message); + assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() -> + failoverFactory.getConnection().send(message)); Mockito.verify(conn2).send(message); } + @Test + void failoverAllDeadAfterSuccess() throws Exception { + ServerSocket ss1 = ServerSocketFactory.getDefault().createServerSocket(0); + ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); + exec.initialize(); + exec.submit(() -> { + Socket accepted = ss1.accept(); + BufferedReader br = new BufferedReader(new InputStreamReader(accepted.getInputStream())); + br.readLine(); + accepted.getOutputStream().write("ok\r\n".getBytes()); + accepted.close(); + ss1.close(); + return true; + }); + TcpNetClientConnectionFactory cf1 = new TcpNetClientConnectionFactory("localhost", ss1.getLocalPort()); + AbstractClientConnectionFactory cf2 = mock(AbstractClientConnectionFactory.class); + List factories = new ArrayList(); + factories.add(cf1); + factories.add(cf2); + doThrow(new UncheckedIOException(new IOException("fail"))).when(cf2).getConnection(); + CountDownLatch latch = new CountDownLatch(2); + cf1.setApplicationEventPublisher(event -> { + if (event instanceof TcpConnectionCloseEvent) { + latch.countDown(); + } + }); + cf2.setApplicationEventPublisher(event -> { }); + FailoverClientConnectionFactory fccf = new FailoverClientConnectionFactory(factories); + fccf.registerListener(msf -> { + latch.countDown(); + return false; + }); + fccf.start(); + fccf.getConnection().send(new GenericMessage<>("test")); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() -> + fccf.getConnection().send(new GenericMessage<>("test"))); + } + @Test public void testFailoverAllDeadButOriginalOkAgain() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); @@ -228,7 +274,7 @@ public class FailoverClientConnectionFactoryTests { Mockito.verify(conn1, times(2)).send(message); } - @Test(expected = UncheckedIOException.class) + @Test public void testFailoverConnectNone() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class); @@ -242,7 +288,8 @@ public class FailoverClientConnectionFactoryTests { FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories); failoverFactory.start(); GenericMessage message = new GenericMessage("foo"); - failoverFactory.getConnection().send(message); + assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() -> + failoverFactory.getConnection().send(message)); } @Test