SGF-497 - Intermittent failures in DurableClientCacheIntegrationTest

(cherry picked from commit 9c47e4b61765093e048dfd23a76b0df394c457f7)
Signed-off-by: John Blum <jblum@pivotal.io>
This commit is contained in:
John Blum
2016-05-16 17:44:21 -07:00
parent b42442d9cd
commit b4ecdf2cc1
6 changed files with 155 additions and 78 deletions

View File

@@ -53,7 +53,7 @@ public class ClientCachePoolTests extends AbstractGemFireClientServerIntegration
@BeforeClass
public static void setupGemFireServer() throws Exception {
gemfireServerProcess = runGemFireServer(ClientCachePoolTests.class);
gemfireServerProcess = startGemFireServer(ClientCachePoolTests.class);
}
@AfterClass

View File

@@ -21,7 +21,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeThat;
import static org.junit.Assume.assumeTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -29,8 +29,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Resource;
import org.junit.After;
@@ -52,6 +53,7 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.Assert;
import org.springframework.util.SocketUtils;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
@@ -84,19 +86,23 @@ import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
@SuppressWarnings("all")
public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServerIntegrationTest {
private static final int SERVER_PORT = 24842;
private static int serverPort;
private static final AtomicInteger RUN_COUNT = new AtomicInteger(1);
private static AtomicBoolean DIRTIES_CONTEXT = new AtomicBoolean(false);
private static List<Integer> regionCacheListenerEventValues =
Collections.synchronizedList(new ArrayList<Integer>(5));
Collections.synchronizedList(new ArrayList<Integer>());
private static ProcessWrapper serverProcess;
private static final String CLIENT_CACHE_INTERESTS_RESULT_POLICY_SYSTEM_PROPERTY =
"gemfire.cache.client.interests.result-policy";
private static final String CACHE_SERVER_PORT_SYSTEM_PROPERTY =
DurableClientCacheIntegrationTest.class.getName().concat(".cache-server-port");
private static final String DURABLE_CLIENT_TIMEOUT_SYSTEM_PROPERTY = "gemfire.cache.client.durable-client-timeout";
private static final String CLIENT_CACHE_INTERESTS_RESULT_POLICY_SYSTEM_PROPERTY =
DurableClientCacheIntegrationTest.class.getName().concat(".interests-result-policy");
private static final String DURABLE_CLIENT_TIMEOUT_SYSTEM_PROPERTY =
DurableClientCacheIntegrationTest.class.getName().concat(".durable-client-timeout");
private static final String SERVER_HOST = "localhost";
@@ -111,35 +117,53 @@ public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServ
@BeforeClass
public static void setupGemFireServer() throws IOException {
serverProcess = runGemFireServer(DurableClientCacheIntegrationTest.class);
serverPort = setSystemProperty(CACHE_SERVER_PORT_SYSTEM_PROPERTY, SocketUtils.findAvailableTcpPort());
serverProcess = startGemFireServer(DurableClientCacheIntegrationTest.class);
}
@AfterClass
public static void tearDownGemFireServer() {
stopGemFireServer(serverProcess);
serverProcess = null;
serverProcess = stopGemFireServer(serverProcess);
clearSystemProperties(DurableClientCacheIntegrationTest.class);
}
protected static boolean isAfterDirtiesContext() {
return DIRTIES_CONTEXT.get();
}
protected static boolean isBeforeDirtiesContext() {
return !isAfterDirtiesContext();
}
protected boolean dirtiesContext() {
return !DIRTIES_CONTEXT.getAndSet(true);
}
protected <T> T valueBeforeAndAfterDirtiesContext(T before, T after) {
return (isBeforeDirtiesContext() ? before : after);
}
@Before
public void setup() {
assertThat(clientCache.getDistributedSystem().getProperties().getProperty(
Properties distributedSystemProperties = clientCache.getDistributedSystem().getProperties();
assertThat(distributedSystemProperties.getProperty(
DistributedSystemUtils.DURABLE_CLIENT_ID_PROPERTY_NAME),
is(equalTo(DurableClientCacheIntegrationTest.class.getSimpleName())));
assertThat(clientCache.getDistributedSystem().getProperties().getProperty(
DistributedSystemUtils.DURABLE_CLIENT_TIMEOUT_PROPERTY_NAME),
is(equalTo(RUN_COUNT.get() == 1 ? "300" : "600")));
is(equalTo(valueBeforeAndAfterDirtiesContext("300", "600"))));
assertRegion(example, "Example", DataPolicy.NORMAL);
}
@After
public void tearDown() {
if (RUN_COUNT.get() == 1) {
if (dirtiesContext()) {
closeApplicationContext();
runClientCacheProducer();
setSystemProperties();
RUN_COUNT.incrementAndGet();
}
regionCacheListenerEventValues.clear();
@@ -155,7 +179,7 @@ public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServ
protected void runClientCacheProducer() {
try {
ClientCache gemfireClientCache = new ClientCacheFactory()
.addPoolServer(SERVER_HOST, SERVER_PORT)
.addPoolServer(SERVER_HOST, serverPort)
.set("name", "ClientCacheProducer")
.set("mcast-port", "0")
.set("log-level", "warning")
@@ -177,6 +201,29 @@ public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServ
System.setProperty(DURABLE_CLIENT_TIMEOUT_SYSTEM_PROPERTY, "600");
}
protected void assertRegion(Region<?, ?> region, String expectedName, DataPolicy expectedDataPolicy) {
assertRegion(region, expectedName, String.format("%1$s%2$s", Region.SEPARATOR, expectedName),
expectedDataPolicy);
}
protected void assertRegion(Region<?, ?> region, String expectedName, String expectedPath,
DataPolicy expectedDataPolicy) {
assertThat(region, is(notNullValue()));
assertThat(region.getName(), is(equalTo(expectedName)));
assertThat(region.getFullPath(), is(equalTo(expectedPath)));
assertThat(region.getAttributes(), is(notNullValue()));
assertThat(region.getAttributes().getDataPolicy(), is(equalTo(expectedDataPolicy)));
}
protected void assertRegionValues(Region<?, ?> region, Object... values) {
assertThat(region.size(), is(equalTo(values.length)));
for (Object value : values) {
assertThat(region.containsValue(value), is(true));
}
}
protected void waitForRegionEntryEvents() {
ThreadUtils.timedWait(TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(500),
new ThreadUtils.WaitCondition() {
@@ -187,38 +234,17 @@ public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServ
);
}
protected void assertRegion(Region<?, ?> region, String expectedName, DataPolicy expectedDataPolicy) {
assertRegion(region, expectedName, String.format("%1$s%2$s", Region.SEPARATOR, expectedName),
expectedDataPolicy);
}
protected void assertRegion(Region<?, ?> region, String expectedName, String expectedPath, DataPolicy expectedDataPolicy) {
assertThat(region, is(notNullValue()));
assertThat(region.getName(), is(equalTo(expectedName)));
assertThat(region.getFullPath(), is(equalTo(expectedPath)));
assertThat(region.getAttributes(), is(notNullValue()));
assertThat(region.getAttributes().getDataPolicy(), is(equalTo(expectedDataPolicy)));
}
protected void assertRegionContents(Region<?, ?> region, Object... values) {
assertThat(region.size(), is(equalTo(values.length)));
for (Object value : values) {
assertThat(region.containsValue(value), is(true));
}
}
@Test
@DirtiesContext
public void durableClientGetsInitializedWithDataOnServer() {
assumeThat(RUN_COUNT.get(), is(equalTo(1)));
assertRegionContents(example, 1, 2, 3);
assumeTrue(isBeforeDirtiesContext());
assertRegionValues(example, 1, 2, 3);
assertThat(regionCacheListenerEventValues.isEmpty(), is(true));
}
@Test
public void durableClientGetsUpdatesFromServerWhileClientWasOffline() {
assumeThat(RUN_COUNT.get(), is(equalTo(2)));
assumeTrue(isAfterDirtiesContext());
assertThat(example.isEmpty(), is(true));
waitForRegionEntryEvents();
@@ -238,14 +264,16 @@ public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServ
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof ClientCache) {
if (RUN_COUNT.get() == 1) {
assertThat(((ClientCache) bean).getDefaultPool().getPendingEventCount(), is(equalTo(-2)));
ClientCache clientCache = (ClientCache) bean;
if (isBeforeDirtiesContext()) {
// NOTE: A value of -2 indicates the client connected to the server for the first time...
assertThat(clientCache.getDefaultPool().getPendingEventCount(), is(equalTo(-2)));
}
else {
// NOTE pending event count is possibly 3 because it includes the 2 puts from the client cache producer
// as well as the "marker"
assertThat(((ClientCache) bean).getDefaultPool().getPendingEventCount(), is(greaterThanOrEqualTo(2)));
pause(TimeUnit.SECONDS.toMillis(3));
// NOTE: the pending event count could be 3 because it should minimally include the 2 puts
// from the client cache producer and possibly a "marker" as well...
assertThat(clientCache.getDefaultPool().getPendingEventCount(), is(greaterThanOrEqualTo(2)));
}
}
@@ -259,7 +287,7 @@ public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServ
private final String regionName;
public RegionDataLoadingBeanPostProcessor(final String regionName) {
public RegionDataLoadingBeanPostProcessor(String regionName) {
Assert.hasText(regionName, "Region name must be specified");
this.regionName = regionName;
}
@@ -303,9 +331,8 @@ public class DurableClientCacheIntegrationTest extends AbstractGemFireClientServ
public static class RegionEntryEventRecordingCacheListener extends CacheListenerAdapter<String, Integer> {
@Override
public void afterCreate(final EntryEvent<String, Integer> event) {
public void afterCreate(EntryEvent<String, Integer> event) {
regionCacheListenerEventValues.add(event.getNewValue());
}
}
}

View File

@@ -119,13 +119,13 @@ public abstract class ProcessExecutor {
return springGemfireSystemProperties;
}
protected static boolean isJvmOption(final String option) {
return (!StringUtils.isEmpty(option) && (option.startsWith("-D") || option.startsWith("-X")));
protected static boolean isJvmOption(String option) {
return (StringUtils.hasText(option) && (option.startsWith("-D") || option.startsWith("-X")));
}
protected static File validateDirectory(final File workingDirectory) {
Assert.isTrue(workingDirectory != null && (workingDirectory.isDirectory() || workingDirectory.mkdirs()));
protected static File validateDirectory(File workingDirectory) {
Assert.isTrue(workingDirectory != null && (workingDirectory.isDirectory() || workingDirectory.mkdirs()),
String.format("Failed to create working directory [%1$s]", workingDirectory));
return workingDirectory;
}
}

View File

@@ -42,11 +42,11 @@ import org.springframework.util.Assert;
@SuppressWarnings("unused")
public abstract class AbstractGemFireClientServerIntegrationTest {
protected static long DEFAULT_TIME_TO_WAIT_FOR_SERVER_TO_START = TimeUnit.SECONDS.toMillis(20);
protected static long FIVE_HUNDRED_MILLISECONDS = TimeUnit.MILLISECONDS.toMillis(500);
protected static long ONE_SECOND_IN_MILLISECONDS = TimeUnit.SECONDS.toMillis(1);
protected static final long DEFAULT_TIME_TO_WAIT_FOR_SERVER_TO_START = TimeUnit.SECONDS.toMillis(20);
protected static final long FIVE_HUNDRED_MILLISECONDS = TimeUnit.MILLISECONDS.toMillis(500);
protected static final long ONE_SECOND_IN_MILLISECONDS = TimeUnit.SECONDS.toMillis(1);
protected static String PROCESS_WORKING_DIRECTORY_CLEAN_SYSTEM_PROPERTY = "spring.gemfire.force.clean";
protected static String CLEAN_PROCESS_WORKING_DIRECTORY_SYSTEM_PROPERTY = "spring.data.gemfire.force.clean";
protected static void pause(final long duration) {
ThreadUtils.timedWait(Math.max(duration, ONE_SECOND_IN_MILLISECONDS), ONE_SECOND_IN_MILLISECONDS,
@@ -58,12 +58,21 @@ public abstract class AbstractGemFireClientServerIntegrationTest {
);
}
protected static ProcessWrapper runGemFireServer(Class<?> testClass) throws IOException {
return runGemFireServer(testClass, DEFAULT_TIME_TO_WAIT_FOR_SERVER_TO_START);
protected static <T> T setSystemProperty(String propertyName, T propertyValue) {
System.setProperty(propertyName, String.valueOf(propertyValue));
return propertyValue;
}
protected static ProcessWrapper runGemFireServer(Class<?> testClass, long waitTimeInMilliseconds) throws IOException {
String serverName = testClass.getSimpleName() + "Server";
protected static ProcessWrapper startGemFireServer(Class<?> testClass) throws IOException {
return startGemFireServer(testClass, DEFAULT_TIME_TO_WAIT_FOR_SERVER_TO_START);
}
protected static ProcessWrapper startGemFireServer(Class<?> testClass, long waitTimeInMilliseconds)
throws IOException {
Assert.notNull(testClass, "'testClass' must not be null");
String serverName = (testClass.getSimpleName() + "Server");
File serverWorkingDirectory = new File(FileSystemUtils.WORKING_DIRECTORY, serverName.toLowerCase());
@@ -71,7 +80,7 @@ public abstract class AbstractGemFireClientServerIntegrationTest {
List<String> arguments = new ArrayList<String>();
arguments.add(String.format("-Dgemfire.name=%1$s", serverName));
addTestClassSystemProperties(testClass, arguments).add(String.format("-Dgemfire.name=%1$s", serverName));
arguments.add("/".concat(testClass.getName().replace(".", "/").concat("-server-context.xml")));
ProcessWrapper gemfireServerProcess = ProcessExecutor.launch(serverWorkingDirectory, ServerProcess.class,
@@ -85,6 +94,18 @@ public abstract class AbstractGemFireClientServerIntegrationTest {
return gemfireServerProcess;
}
static List<String> addTestClassSystemProperties(Class<?> testClass, List<String> arguments) {
String testClassName = testClass.getName();
for (String propertyName : System.getProperties().stringPropertyNames()) {
if (propertyName.startsWith(testClassName)) {
arguments.add(String.format("-D%1$s=%2$s", propertyName, System.getProperty(propertyName)));
}
}
return arguments;
}
static void waitForServerToStart(final ProcessWrapper process, long duration) {
ThreadUtils.timedWait(Math.max(duration, FIVE_HUNDRED_MILLISECONDS), FIVE_HUNDRED_MILLISECONDS,
new ThreadUtils.WaitCondition() {
@@ -98,12 +119,31 @@ public abstract class AbstractGemFireClientServerIntegrationTest {
);
}
protected static void stopGemFireServer(ProcessWrapper process) {
process.shutdown();
protected static ProcessWrapper stopGemFireServer(ProcessWrapper process) {
try {
process.shutdown();
if (Boolean.valueOf(System.getProperty(PROCESS_WORKING_DIRECTORY_CLEAN_SYSTEM_PROPERTY, Boolean.TRUE.toString()))) {
org.springframework.util.FileSystemUtils.deleteRecursively(process.getWorkingDirectory());
boolean springGemFireForceClean = Boolean.valueOf(System.getProperty(
CLEAN_PROCESS_WORKING_DIRECTORY_SYSTEM_PROPERTY, Boolean.TRUE.toString()));
if (springGemFireForceClean) {
org.springframework.util.FileSystemUtils.deleteRecursively(process.getWorkingDirectory());
}
return null;
}
catch (Exception e) {
return process;
}
}
protected static void clearSystemProperties(Class<?> testClass) {
String testClassName = testClass.getName();
for (String propertyName : System.getProperties().stringPropertyNames()) {
if (propertyName.startsWith(testClassName)) {
System.clearProperty(propertyName);
}
}
}
}

View File

@@ -13,8 +13,16 @@
<util:properties id="clientProperties">
<prop key="gemfire.cache.client.durable-client-id">DurableClientCacheIntegrationTest</prop>
<prop key="gemfire.cache.client.server.host">localhost</prop>
<prop key="gemfire.cache.client.server.port">24842</prop>
<prop key="gemfire.cache.client.durable-client-timeout">
${org.springframework.data.gemfire.client.DurableClientCacheIntegrationTest.durable-client-timeout:300}
</prop>
<prop key="gemfire.cache.client.interests.result-policy">
${org.springframework.data.gemfire.client.DurableClientCacheIntegrationTest.interests-result-policy:KEYS_VALUES}
</prop>
<prop key="gemfire.cache.server.host">localhost</prop>
<prop key="gemfire.cache.server.port">
${org.springframework.data.gemfire.client.DurableClientCacheIntegrationTest.cache-server-port}
</prop>
</util:properties>
<context:property-placeholder properties-ref="clientProperties"/>
@@ -29,20 +37,20 @@
<prop key="name">DurableClientCacheIntegrationTestClient</prop>
</util:properties>
<gfe:pool id="gemfireServerPool" keep-alive="true" subscription-enabled="true">
<gfe:server host="${gemfire.cache.client.server.host}" port="${gemfire.cache.client.server.port}"/>
</gfe:pool>
<gfe:client-cache properties-ref="gemfireProperties" pool-name="gemfireServerPool" use-bean-factory-locator="false"
<gfe:client-cache properties-ref="gemfireProperties" pool-name="gemfireServerPool"
durable-client-id="${gemfire.cache.client.durable-client-id}"
durable-client-timeout="${gemfire.cache.client.durable-client-timeout:}"
durable-client-timeout="${gemfire.cache.client.durable-client-timeout}"
keep-alive="true" ready-for-events="true"/>
<gfe:pool id="gemfireServerPool" keep-alive="true" subscription-enabled="true">
<gfe:server host="${gemfire.cache.server.host}" port="${gemfire.cache.server.port}"/>
</gfe:pool>
<gfe:client-region id="Example" pool-name="gemfireServerPool" shortcut="CACHING_PROXY">
<gfe:cache-listener>
<bean class="org.springframework.data.gemfire.client.DurableClientCacheIntegrationTest.RegionEntryEventRecordingCacheListener"/>
</gfe:cache-listener>
<gfe:regex-interest durable="true" pattern= ".*" result-policy="${gemfire.cache.client.interests.result-policy:KEYS_VALUES}"/>
<gfe:regex-interest durable="true" pattern= ".*" result-policy="${gemfire.cache.client.interests.result-policy}"/>
</gfe:client-region>
</beans>

View File

@@ -15,7 +15,9 @@
<util:properties id="serverProperties">
<prop key="gemfire.cache.server.host">localhost</prop>
<prop key="gemfire.cache.server.port">24842</prop>
<prop key="gemfire.cache.server.port">
${org.springframework.data.gemfire.client.DurableClientCacheIntegrationTest.cache-server-port}
</prop>
</util:properties>
<context:property-placeholder properties-ref="serverProperties"/>