GH-157: Move ContainerTestUtils to s-k-test

Resolves #157

Add `@since 1.0.3` into newly created class
This commit is contained in:
Gary Russell
2016-07-29 14:56:54 -04:00
committed by Artem Bilan
parent 06772d0829
commit ad62b09732
4 changed files with 116 additions and 68 deletions

View File

@@ -0,0 +1,114 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.test.utils;
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.ReflectionUtils.MethodCallback;
import org.springframework.util.ReflectionUtils.MethodFilter;
/**
* Utilities for testing listener containers. No hard references to container
* classes are used to avoid circular project dependencies.
*
* @author Gary Russell
* @since 1.0.3
*/
public final class ContainerTestUtils {
private ContainerTestUtils() {
// private ctor
}
/**
* Wait until the container has the required number of assigned partitions.
* @param container the container.
* @param partitions the number of partitions.
* @throws Exception an exception.
*/
public static void waitForAssignment(Object container, int partitions)
throws Exception {
if (container.getClass().getSimpleName().equals("KafkaMessageListenerContainer")) {
waitForSingleContainerAssignment(container, partitions);
return;
}
List<?> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class);
int n = 0;
int count = 0;
Method getAssignedPartitions = null;
while (n++ < 600 && count < partitions) {
count = 0;
for (Object aContainer : containers) {
if (getAssignedPartitions == null) {
getAssignedPartitions = getAssignedPartitionsMethod(aContainer.getClass());
}
Collection<?> assignedPartitions = (Collection<?>) getAssignedPartitions.invoke(aContainer);
if (assignedPartitions != null) {
count += assignedPartitions.size();
}
}
if (count < partitions) {
Thread.sleep(100);
}
}
assertThat(count).isEqualTo(partitions);
}
private static void waitForSingleContainerAssignment(Object container, int partitions)
throws Exception {
int n = 0;
int count = 0;
Method getAssignedPartitions = getAssignedPartitionsMethod(container.getClass());
while (n++ < 600 && count < partitions) {
count = 0;
Collection<?> assignedPartitions = (Collection<?>) getAssignedPartitions.invoke(container);
if (assignedPartitions != null) {
count = assignedPartitions.size();
}
if (count < partitions) {
Thread.sleep(100);
}
}
assertThat(count).isEqualTo(partitions);
}
private static Method getAssignedPartitionsMethod(Class<?> clazz) {
final AtomicReference<Method> theMethod = new AtomicReference<Method>();
ReflectionUtils.doWithMethods(clazz, new MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
theMethod.set(method);
}
}, new MethodFilter() {
@Override
public boolean matches(Method method) {
return method.getName().equals("getAssignedPartitions") && method.getParameterTypes().length == 0;
}
});
assertThat(theMethod.get()).isNotNull();
return theMethod.get();
}
}

View File

@@ -60,6 +60,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
/**

View File

@@ -1,68 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
/**
* @author Gary Russell
*
*/
public final class ContainerTestUtils {
private ContainerTestUtils() {
// private ctor
}
public static void waitForAssignment(ConcurrentMessageListenerContainer<Integer, String> container, int partitions)
throws Exception {
List<KafkaMessageListenerContainer<Integer, String>> containers = container.getContainers();
int n = 0;
int count = 0;
while (n++ < 600 && count < partitions) {
count = 0;
for (KafkaMessageListenerContainer<Integer, String> aContainer : containers) {
if (aContainer.getAssignedPartitions() != null) {
count += aContainer.getAssignedPartitions().size();
}
}
if (count < partitions) {
Thread.sleep(100);
}
}
assertThat(count).isEqualTo(partitions);
}
public static void waitForAssignment(KafkaMessageListenerContainer<Integer, String> container, int partitions)
throws Exception {
int n = 0;
int count = 0;
while (n++ < 600 && count < partitions) {
count = 0;
if (container.getAssignedPartitions() != null) {
count = container.getAssignedPartitions().size();
}
if (count < partitions) {
Thread.sleep(100);
}
}
assertThat(count).isEqualTo(partitions);
}
}

View File

@@ -56,6 +56,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;