diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java new file mode 100644 index 00000000..9520c932 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test.utils; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.springframework.util.ReflectionUtils; +import org.springframework.util.ReflectionUtils.MethodCallback; +import org.springframework.util.ReflectionUtils.MethodFilter; + +/** + * Utilities for testing listener containers. No hard references to container + * classes are used to avoid circular project dependencies. + * + * @author Gary Russell + * @since 1.0.3 + */ +public final class ContainerTestUtils { + + private ContainerTestUtils() { + // private ctor + } + + /** + * Wait until the container has the required number of assigned partitions. + * @param container the container. + * @param partitions the number of partitions. + * @throws Exception an exception. + */ + public static void waitForAssignment(Object container, int partitions) + throws Exception { + if (container.getClass().getSimpleName().equals("KafkaMessageListenerContainer")) { + waitForSingleContainerAssignment(container, partitions); + return; + } + List containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class); + int n = 0; + int count = 0; + Method getAssignedPartitions = null; + while (n++ < 600 && count < partitions) { + count = 0; + for (Object aContainer : containers) { + if (getAssignedPartitions == null) { + getAssignedPartitions = getAssignedPartitionsMethod(aContainer.getClass()); + } + Collection assignedPartitions = (Collection) getAssignedPartitions.invoke(aContainer); + if (assignedPartitions != null) { + count += assignedPartitions.size(); + } + } + if (count < partitions) { + Thread.sleep(100); + } + } + assertThat(count).isEqualTo(partitions); + } + + private static void waitForSingleContainerAssignment(Object container, int partitions) + throws Exception { + int n = 0; + int count = 0; + Method getAssignedPartitions = getAssignedPartitionsMethod(container.getClass()); + while (n++ < 600 && count < partitions) { + count = 0; + Collection assignedPartitions = (Collection) getAssignedPartitions.invoke(container); + if (assignedPartitions != null) { + count = assignedPartitions.size(); + } + if (count < partitions) { + Thread.sleep(100); + } + } + assertThat(count).isEqualTo(partitions); + } + + private static Method getAssignedPartitionsMethod(Class clazz) { + final AtomicReference theMethod = new AtomicReference(); + ReflectionUtils.doWithMethods(clazz, new MethodCallback() { + + @Override + public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException { + theMethod.set(method); + } + }, new MethodFilter() { + + @Override + public boolean matches(Method method) { + return method.getName().equals("getAssignedPartitions") && method.getParameterTypes().length == 0; + } + }); + assertThat(theMethod.get()).isNotNull(); + return theMethod.get(); + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 8148a50e..8d31737f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -60,6 +60,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; /** diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerTestUtils.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerTestUtils.java deleted file mode 100644 index 8fc39bff..00000000 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerTestUtils.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.listener; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.List; - -/** - * @author Gary Russell - * - */ -public final class ContainerTestUtils { - - private ContainerTestUtils() { - // private ctor - } - - public static void waitForAssignment(ConcurrentMessageListenerContainer container, int partitions) - throws Exception { - List> containers = container.getContainers(); - int n = 0; - int count = 0; - while (n++ < 600 && count < partitions) { - count = 0; - for (KafkaMessageListenerContainer aContainer : containers) { - if (aContainer.getAssignedPartitions() != null) { - count += aContainer.getAssignedPartitions().size(); - } - } - if (count < partitions) { - Thread.sleep(100); - } - } - assertThat(count).isEqualTo(partitions); - } - - public static void waitForAssignment(KafkaMessageListenerContainer container, int partitions) - throws Exception { - int n = 0; - int count = 0; - while (n++ < 600 && count < partitions) { - count = 0; - if (container.getAssignedPartitions() != null) { - count = container.getAssignedPartitions().size(); - } - if (count < partitions) { - Thread.sleep(100); - } - } - assertThat(count).isEqualTo(partitions); - } - -} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 8c009e84..8c556adc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -56,6 +56,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy;