Geode test container will not start locator by default
This commit is contained in:
@@ -16,17 +16,17 @@ The **$$cassandra$$** $$sink$$ has the following options:
|
||||
//tag::configuration-properties[]
|
||||
$$spring.data.cassandra.cluster-name$$:: $$<documentation missing>$$ *($$String$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.compression$$:: $$Compression supported by the Cassandra binary protocol.$$ *($$Compression$$, default: `$$none$$`, possible values: `LZ4`,`SNAPPY`,`NONE`)*
|
||||
$$spring.data.cassandra.connect-timeout$$:: $$Socket option: connection time out.$$ *($$Duration$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.consistency-level$$:: $$Queries consistency level.$$ *($$DefaultConsistencyLevel$$, default: `$$<none>$$`, possible values: `ANY`,`ONE`,`TWO`,`THREE`,`QUORUM`,`ALL`,`LOCAL_ONE`,`LOCAL_QUORUM`,`EACH_QUORUM`,`SERIAL`,`LOCAL_SERIAL`)*
|
||||
$$spring.data.cassandra.connect-timeout$$:: $$<documentation missing>$$ *($$Duration$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.consistency-level$$:: $$<documentation missing>$$ *($$DefaultConsistencyLevel$$, default: `$$<none>$$`, possible values: `ANY`,`ONE`,`TWO`,`THREE`,`QUORUM`,`ALL`,`LOCAL_ONE`,`LOCAL_QUORUM`,`EACH_QUORUM`,`SERIAL`,`LOCAL_SERIAL`)*
|
||||
$$spring.data.cassandra.contact-points$$:: $$Cluster node addresses in the form 'host:port', or a simple 'host' to use the configured port.$$ *($$List<String>$$, default: `$$[127.0.0.1:9042]$$`)*
|
||||
$$spring.data.cassandra.fetch-size$$:: $$<documentation missing>$$ *($$Integer$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.keyspace-name$$:: $$Keyspace name to use.$$ *($$String$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.local-datacenter$$:: $$Datacenter that is considered "local". Contact points should be from this datacenter.$$ *($$String$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.page-size$$:: $$Queries default page size.$$ *($$Integer$$, default: `$$5000$$`)*
|
||||
$$spring.data.cassandra.password$$:: $$Login password of the server.$$ *($$String$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.read-timeout$$:: $$Socket option: read time out.$$ *($$Duration$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.port$$:: $$Port to use if a contact point does not specify one.$$ *($$Integer$$, default: `$$9042$$`)*
|
||||
$$spring.data.cassandra.read-timeout$$:: $$<documentation missing>$$ *($$Duration$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.schema-action$$:: $$Schema action to take at startup.$$ *($$String$$, default: `$$none$$`)*
|
||||
$$spring.data.cassandra.serial-consistency-level$$:: $$Queries serial consistency level.$$ *($$DefaultConsistencyLevel$$, default: `$$<none>$$`, possible values: `ANY`,`ONE`,`TWO`,`THREE`,`QUORUM`,`ALL`,`LOCAL_ONE`,`LOCAL_QUORUM`,`EACH_QUORUM`,`SERIAL`,`LOCAL_SERIAL`)*
|
||||
$$spring.data.cassandra.serial-consistency-level$$:: $$<documentation missing>$$ *($$DefaultConsistencyLevel$$, default: `$$<none>$$`, possible values: `ANY`,`ONE`,`TWO`,`THREE`,`QUORUM`,`ALL`,`LOCAL_ONE`,`LOCAL_QUORUM`,`EACH_QUORUM`,`SERIAL`,`LOCAL_SERIAL`)*
|
||||
$$spring.data.cassandra.session-name$$:: $$Name of the Cassandra session.$$ *($$String$$, default: `$$<none>$$`)*
|
||||
$$spring.data.cassandra.ssl$$:: $$Enable SSL support.$$ *($$Boolean$$, default: `$$false$$`)*
|
||||
$$spring.data.cassandra.username$$:: $$Login user of the server.$$ *($$String$$, default: `$$<none>$$`)*
|
||||
|
||||
@@ -67,7 +67,8 @@ public class GeodeSinkTests {
|
||||
"geode.region.regionName=Stocks",
|
||||
"geode.consumer.json=true",
|
||||
"geode.consumer.key-expression=payload.getField('symbol')",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
InputDestination inputDestination = context.getBean(InputDestination.class);
|
||||
|
||||
@@ -88,7 +89,8 @@ public class GeodeSinkTests {
|
||||
"spring.cloud.function.definition=geodeConsumer",
|
||||
"geode.region.regionName=Stocks",
|
||||
"geode.consumer.key-expression='key'",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
InputDestination inputDestination = context.getBean(InputDestination.class);
|
||||
inputDestination.send(new GenericMessage<>("value"));
|
||||
|
||||
@@ -70,7 +70,8 @@ public class GeodeSourceTests {
|
||||
applicationContextRunner
|
||||
.withPropertyValues("geode.region.regionName=myRegion",
|
||||
"geode.supplier.event-expression=key+':'+newValue",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort(),
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort(),
|
||||
"spring.cloud.function.definition=geodeSupplier")
|
||||
.run(context -> {
|
||||
|
||||
@@ -101,7 +102,8 @@ public class GeodeSourceTests {
|
||||
"geode.region.regionName=myRegion",
|
||||
"geode.client.pdx-read-serialized=true",
|
||||
"geode.supplier.query=Select * from /myRegion where symbol='XXX' and price > 140",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
OutputDestination outputDestination = context.getBean(OutputDestination.class);
|
||||
// Using local region here
|
||||
|
||||
@@ -42,16 +42,24 @@ public class GeodeContainer extends GenericContainer {
|
||||
|
||||
private final int cacheServerPort;
|
||||
|
||||
private final boolean useLocator;
|
||||
|
||||
/**
|
||||
* Create a Geode container from a Docker image.
|
||||
* @param dockerImageName the name of the image.
|
||||
* @param locatorPort the locator port.
|
||||
* @param cacheServerPort the cache server port.
|
||||
* @param useLocator set to use a locator.
|
||||
*/
|
||||
public GeodeContainer(@NonNull String dockerImageName, int locatorPort, int cacheServerPort) {
|
||||
public GeodeContainer(@NonNull String dockerImageName, int locatorPort, int cacheServerPort, boolean useLocator) {
|
||||
super(dockerImageName);
|
||||
this.locatorPort = locatorPort;
|
||||
this.cacheServerPort = cacheServerPort;
|
||||
this.useLocator = useLocator;
|
||||
}
|
||||
|
||||
public GeodeContainer(@NonNull String dockerImageName, int locatorPort, int cacheServerPort) {
|
||||
this(dockerImageName, locatorPort, cacheServerPort, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -61,11 +69,17 @@ public class GeodeContainer extends GenericContainer {
|
||||
* @param image the image builder.
|
||||
* @param locatorPort the locator port.
|
||||
* @param cacheServerPort the server port.
|
||||
* @param useLocator set to use a locator.
|
||||
*/
|
||||
public GeodeContainer(@NonNull Future<String> image, int locatorPort, int cacheServerPort) {
|
||||
public GeodeContainer(@NonNull Future<String> image, int locatorPort, int cacheServerPort, boolean useLocator) {
|
||||
super(image);
|
||||
this.locatorPort = locatorPort;
|
||||
this.cacheServerPort = cacheServerPort;
|
||||
this.useLocator = useLocator;
|
||||
}
|
||||
|
||||
public GeodeContainer(@NonNull Future<String> image, int locatorPort, int cacheServerPort) {
|
||||
this(image, locatorPort, cacheServerPort, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -73,7 +87,7 @@ public class GeodeContainer extends GenericContainer {
|
||||
* @return the connect command String.
|
||||
*/
|
||||
public String connect() {
|
||||
return "connect --locator=" + locators();
|
||||
return useLocator ? "connect --locator=" + locators() : "connect --jmx-manager=localhost[1099]";
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -16,13 +16,12 @@
|
||||
|
||||
package org.springframework.cloud.fn.test.support.geode;
|
||||
|
||||
import com.github.dockerjava.api.model.HealthCheck;
|
||||
import com.github.dockerjava.api.model.HostConfig;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.github.dockerjava.api.command.CreateContainerCmd;
|
||||
import com.github.dockerjava.api.model.ExposedPort;
|
||||
import com.github.dockerjava.api.model.HostConfig;
|
||||
import com.github.dockerjava.api.model.PortBinding;
|
||||
import com.github.dockerjava.api.model.Ports;
|
||||
import org.testcontainers.images.builder.ImageFromDockerfile;
|
||||
@@ -44,12 +43,20 @@ public class GeodeContainerIntializer {
|
||||
|
||||
private Optional<Consumer<GeodeContainer>> postProcessor;
|
||||
|
||||
private final boolean useLocator;
|
||||
|
||||
/**
|
||||
* Create, start, and perform post processing on a {@link GeodeContainer}.
|
||||
* @param postProcessor a {@code Consumer<GeodeContainer>} to run after the container is
|
||||
* started.
|
||||
*/
|
||||
public GeodeContainerIntializer(Consumer<GeodeContainer> postProcessor) {
|
||||
this(postProcessor, false);
|
||||
}
|
||||
|
||||
public GeodeContainerIntializer(Consumer<GeodeContainer> postProcessor, boolean useLocator) {
|
||||
this.useLocator = useLocator;
|
||||
|
||||
cacheServerPort = SocketUtils.findAvailableTcpPort();
|
||||
|
||||
locatorPort = SocketUtils.findAvailableTcpPort();
|
||||
@@ -60,7 +67,7 @@ public class GeodeContainerIntializer {
|
||||
.withFileFromClasspath("Dockerfile", "geode/Dockerfile")
|
||||
.withBuildArg("CACHE_SERVER_PORT", String.valueOf(cacheServerPort))
|
||||
.withBuildArg("LOCATOR_PORT", String.valueOf(locatorPort)),
|
||||
locatorPort, cacheServerPort);
|
||||
locatorPort, cacheServerPort, useLocator);
|
||||
startContainer();
|
||||
}
|
||||
|
||||
@@ -68,7 +75,7 @@ public class GeodeContainerIntializer {
|
||||
* Create and start a {@link GeodeContainer}.
|
||||
*/
|
||||
public GeodeContainerIntializer() {
|
||||
this(null);
|
||||
this(null, false);
|
||||
}
|
||||
|
||||
private void startContainer() {
|
||||
@@ -84,9 +91,16 @@ public class GeodeContainerIntializer {
|
||||
|
||||
geode.withCommand("tail", "-f", "/dev/null").withCreateContainerCmdModifier(cmd).start();
|
||||
|
||||
geode.execGfsh("start locator --name=Locator1 --hostname-for-clients=localhost --port=" + locatorPort);
|
||||
geode.execGfsh("connect --locator=" + geode.locators(),
|
||||
"start server --name=Server1 --hostname-for-clients=localhost --server-port=" + cacheServerPort);
|
||||
if (useLocator) {
|
||||
geode.execGfsh("start locator --name=Locator1 --hostname-for-clients=localhost --port=" + locatorPort);
|
||||
geode.execGfsh(geode.connect(),
|
||||
"start server --name=Server1 --hostname-for-clients=localhost --server-port=" + cacheServerPort);
|
||||
}
|
||||
else {
|
||||
geode.execGfsh(
|
||||
"start server --name=Server1 --hostname-for-clients=localhost --server-port=" + cacheServerPort +
|
||||
" --J=-Dgemfire.jmx-manager=true --J=-Dgemfire.jmx-manager-start=true");
|
||||
}
|
||||
postProcessor.ifPresent(geodeContainerConsumer -> geodeContainerConsumer.accept(geode));
|
||||
}
|
||||
|
||||
|
||||
@@ -64,7 +64,8 @@ public class GeodeConsumerApplicationTests {
|
||||
"geode.region.regionName=Stocks",
|
||||
"geode.consumer.json=true",
|
||||
"geode.consumer.key-expression=payload.getField('symbol')",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
Consumer<Message<?>> geodeConsumer = context.getBean("geodeConsumer", Consumer.class);
|
||||
|
||||
@@ -84,7 +85,8 @@ public class GeodeConsumerApplicationTests {
|
||||
.withPropertyValues(
|
||||
"geode.region.regionName=Stocks",
|
||||
"geode.consumer.key-expression='key'",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
Consumer<Message<?>> geodeConsumer = context.getBean("geodeConsumer", Consumer.class);
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ public class GeodeSupplierApplicationTests {
|
||||
|
||||
private static GeodeContainer geode;
|
||||
|
||||
private ObjectMapper objectMapper = new ObjectMapper();
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@BeforeAll
|
||||
static void setup() {
|
||||
@@ -67,12 +67,13 @@ public class GeodeSupplierApplicationTests {
|
||||
applicationContextRunner
|
||||
.withPropertyValues("geode.region.regionName=myRegion",
|
||||
"geode.supplier.event-expression=#root",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
geode.connectAndExecGfsh(
|
||||
"put --key=hello --value=world --region=myRegion",
|
||||
"put --key=foo --value=bar --region=myRegion",
|
||||
"put --key=hello --value=dave --region=myRegion");
|
||||
Region region = context.getBean(Region.class);
|
||||
region.put("hello", "world");
|
||||
region.put("foo", "bar");
|
||||
region.replace("hello", "dave");
|
||||
|
||||
Supplier<Flux<EntryEvent>> geodeSupplier = context.getBean("geodeSupplier", Supplier.class);
|
||||
|
||||
@@ -99,7 +100,8 @@ public class GeodeSupplierApplicationTests {
|
||||
.withPropertyValues(
|
||||
"geode.region.regionName=myRegion",
|
||||
"geode.client.pdx-read-serialized=true",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
Supplier<Flux<String>> geodeSupplier = context.getBean("geodeSupplier", Supplier.class);
|
||||
// Using local region here
|
||||
@@ -120,27 +122,6 @@ public class GeodeSupplierApplicationTests {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void connectTypeServer() {
|
||||
applicationContextRunner
|
||||
.withPropertyValues("geode.region.regionName=myRegion",
|
||||
"geode.pool.connect-type=server",
|
||||
"geode.supplier.event-expression=key+':'+newValue",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
|
||||
// Using local region here since it's faster
|
||||
Region<String, String> region = context.getBean(Region.class);
|
||||
|
||||
region.put("foo", "bar");
|
||||
Supplier<Flux<String>> geodeSupplier = context.getBean("geodeSupplier", Supplier.class);
|
||||
|
||||
StepVerifier.create(geodeSupplier.get()).assertNext(val -> {
|
||||
assertThat(val).isEqualTo("foo:bar");
|
||||
}).thenCancel().verify(Duration.ofSeconds(10));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void continuousQuery() {
|
||||
applicationContextRunner
|
||||
@@ -148,7 +129,8 @@ public class GeodeSupplierApplicationTests {
|
||||
"geode.region.regionName=myRegion",
|
||||
"geode.client.pdx-read-serialized=true",
|
||||
"geode.supplier.query=Select * from /myRegion where symbol='XXX' and price > 140",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getLocatorPort())
|
||||
"geode.pool.connectType=server",
|
||||
"geode.pool.hostAddresses=" + "localhost:" + geode.getCacheServerPort())
|
||||
.run(context -> {
|
||||
Supplier<Flux<String>> geodeCqSupplier = context.getBean("geodeSupplier", Supplier.class);
|
||||
// Using local region here
|
||||
|
||||
Reference in New Issue
Block a user