diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java index edef76d76d..b64fd8cb73 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -257,7 +257,7 @@ public class FailoverClientConnectionFactory extends AbstractClientConnectionFac if (!this.factoryIterator.hasNext()) { this.factoryIterator = this.connectionFactories.iterator(); } - boolean retried = false; + boolean restartedList = false; while (!success) { try { nextFactory = this.factoryIterator.next(); @@ -275,17 +275,18 @@ public class FailoverClientConnectionFactory extends AbstractClientConnectionFac + e.toString() + ", trying another"); } + if (restartedList && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) { + logger.debug("Failover failed to find a connection"); + /* + * We've tried every factory including the + * one the current connection was on. + */ + this.open = false; + throw e; + } if (!this.factoryIterator.hasNext()) { - if (retried && (lastFactoryToTry == null || lastFactoryToTry.equals(nextFactory))) { - /* - * We've tried every factory including the - * one the current connection was on. - */ - this.open = false; - throw e; - } this.factoryIterator = this.connectionFactories.iterator(); - retried = true; + restartedList = true; } } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java index 5b7f8ed2d1..4b75228ed4 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java @@ -277,10 +277,14 @@ public abstract class TcpConnectionSupport implements TcpConnection { @Override public TcpListener getListener() { if (this.manualListenerRegistration) { - if (this.logger.isDebugEnabled()) { + boolean debugEnabled = this.logger.isDebugEnabled(); + if (debugEnabled) { this.logger.debug(getConnectionId() + " Waiting for listener registration"); } waitForListenerRegistration(); + if (debugEnabled) { + this.logger.debug(getConnectionId() + " Listener registered"); + } } return this.listener; } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java index 3dab3cd438..6d9fd801e2 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,11 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.UncheckedIOException; +import java.net.ServerSocket; import java.net.Socket; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -41,7 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; +import javax.net.ServerSocketFactory; + +import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.mockito.Mockito; @@ -62,6 +67,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * @author Gary Russell @@ -173,7 +179,7 @@ public class FailoverClientConnectionFactoryTests { inOrder.verifyNoMoreInteractions(); } - @Test(expected = UncheckedIOException.class) + @Test public void testFailoverAllDead() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class); @@ -193,10 +199,50 @@ public class FailoverClientConnectionFactoryTests { FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories); failoverFactory.start(); GenericMessage message = new GenericMessage("foo"); - failoverFactory.getConnection().send(message); + assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() -> + failoverFactory.getConnection().send(message)); Mockito.verify(conn2).send(message); } + @Test + void failoverAllDeadAfterSuccess() throws Exception { + ServerSocket ss1 = ServerSocketFactory.getDefault().createServerSocket(0); + ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); + exec.initialize(); + exec.submit(() -> { + Socket accepted = ss1.accept(); + BufferedReader br = new BufferedReader(new InputStreamReader(accepted.getInputStream())); + br.readLine(); + accepted.getOutputStream().write("ok\r\n".getBytes()); + accepted.close(); + ss1.close(); + return true; + }); + TcpNetClientConnectionFactory cf1 = new TcpNetClientConnectionFactory("localhost", ss1.getLocalPort()); + AbstractClientConnectionFactory cf2 = mock(AbstractClientConnectionFactory.class); + List factories = new ArrayList(); + factories.add(cf1); + factories.add(cf2); + doThrow(new UncheckedIOException(new IOException("fail"))).when(cf2).getConnection(); + CountDownLatch latch = new CountDownLatch(2); + cf1.setApplicationEventPublisher(event -> { + if (event instanceof TcpConnectionCloseEvent) { + latch.countDown(); + } + }); + cf2.setApplicationEventPublisher(event -> { }); + FailoverClientConnectionFactory fccf = new FailoverClientConnectionFactory(factories); + fccf.registerListener(msf -> { + latch.countDown(); + return false; + }); + fccf.start(); + fccf.getConnection().send(new GenericMessage<>("test")); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() -> + fccf.getConnection().send(new GenericMessage<>("test"))); + } + @Test public void testFailoverAllDeadButOriginalOkAgain() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); @@ -228,7 +274,7 @@ public class FailoverClientConnectionFactoryTests { Mockito.verify(conn1, times(2)).send(message); } - @Test(expected = UncheckedIOException.class) + @Test public void testFailoverConnectNone() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class); @@ -242,7 +288,8 @@ public class FailoverClientConnectionFactoryTests { FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories); failoverFactory.start(); GenericMessage message = new GenericMessage("foo"); - failoverFactory.getConnection().send(message); + assertThatExceptionOfType(UncheckedIOException.class).isThrownBy(() -> + failoverFactory.getConnection().send(message)); } @Test