diff --git a/pom.xml b/pom.xml
index 3bf5705a0..ce8ee5b84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -273,6 +273,40 @@
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.8
+
+
+ reserve-network-port
+
+ reserve-network-port
+
+ process-resources
+
+
+ build.cassandra.native_transport_port
+ build.cassandra.rpc_port
+ build.cassandra.storage_port
+ build.cassandra.ssl_storage_port
+
+
+
+
+ add-shared-source-dir
+
+ add-test-source
+
+ generate-test-sources
+
+
+ ../shared/src/test/java
+
+
+
+
+
org.apache.maven.plugins
maven-dependency-plugin
@@ -304,6 +338,7 @@
1
-Xmx1024m -Xss512m -javaagent:${com.github.stephenc:jamm:jar}
+ true
false
**/test/integration/**/*.java
@@ -326,5 +361,31 @@
+
+
+
+ src/main/resources
+ true
+
+ **/*
+
+
+
+
+
+ src/test/resources
+ true
+
+ **/*
+
+
+
+ ../shared/src/test/resources
+ true
+
+ **/*
+
+
+
diff --git a/shared/src/test/java/org/springframework/cassandra/test/integration/support/AbstractTestJavaConfig.java b/shared/src/test/java/org/springframework/cassandra/test/integration/support/AbstractTestJavaConfig.java
new file mode 100644
index 000000000..9e957e5d5
--- /dev/null
+++ b/shared/src/test/java/org/springframework/cassandra/test/integration/support/AbstractTestJavaConfig.java
@@ -0,0 +1,16 @@
+package org.springframework.cassandra.test.integration.support;
+
+import org.springframework.cassandra.config.java.AbstractCassandraConfiguration;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public abstract class AbstractTestJavaConfig extends AbstractCassandraConfiguration {
+
+ public static BuildProperties PROPS = new BuildProperties();
+ public static final int PORT = PROPS.getCassandraPort();
+
+ @Override
+ protected int getPort() {
+ return PORT;
+ }
+}
diff --git a/shared/src/test/java/org/springframework/cassandra/test/integration/support/BuildProperties.java b/shared/src/test/java/org/springframework/cassandra/test/integration/support/BuildProperties.java
new file mode 100644
index 000000000..2a07fda77
--- /dev/null
+++ b/shared/src/test/java/org/springframework/cassandra/test/integration/support/BuildProperties.java
@@ -0,0 +1,64 @@
+package org.springframework.cassandra.test.integration.support;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+@SuppressWarnings("serial")
+public class BuildProperties extends Properties {
+
+ public BuildProperties() {
+ this("/build.properties");
+ }
+
+ public BuildProperties(String resourceName) {
+ loadProperties(resourceName);
+ }
+
+ public void loadProperties(String resourceName) {
+ InputStream in = null;
+ try {
+ in = getClass().getResourceAsStream(resourceName);
+ if (in == null) {
+ return;
+ }
+ load(in);
+
+ } catch (Exception x) {
+ throw new RuntimeException(x);
+
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (Exception e) {
+ // gulp
+ }
+ }
+ }
+ }
+
+ public int getCassandraPort() {
+ return getInt("build.cassandra.native_transport_port");
+ }
+
+ public int getCassandraRpcPort() {
+ return getInt("build.cassandra.rpc_port");
+ }
+
+ public int getCassandraStoragePort() {
+ return getInt("build.cassandra.storage_port");
+ }
+
+ public int getCassandraSslStoragePort() {
+ return getInt("build.cassandra.ssl_storage_port");
+ }
+
+ public int getInt(String key) {
+ String property = getProperty(key);
+ return Integer.parseInt(property);
+ }
+
+ public boolean getBoolean(String key) {
+ return Boolean.parseBoolean(getProperty(key));
+ }
+}
diff --git a/shared/src/test/resources/build.properties b/shared/src/test/resources/build.properties
new file mode 100644
index 000000000..ba2281c73
--- /dev/null
+++ b/shared/src/test/resources/build.properties
@@ -0,0 +1,4 @@
+build.cassandra.native_transport_port=@build.cassandra.native_transport_port@
+build.cassandra.rpc_port=@build.cassandra.rpc_port@
+build.cassandra.storage_port=@build.cassandra.storage_port@
+build.cassandra.ssl_storage_port=@build.cassandra.ssl_storage_port@
diff --git a/spring-cassandra/pom.xml b/spring-cassandra/pom.xml
index f675f2771..26dd63c0b 100644
--- a/spring-cassandra/pom.xml
+++ b/spring-cassandra/pom.xml
@@ -41,6 +41,21 @@
org.springframework
spring-tx
+
+ ${project.groupId}
+ spring-data-commons
+ ${springdata.commons}
+
+
+ com.datastax.cassandra
+ cassandra-driver-core
+
+
+ log4j
+ log4j
+
+
+
javax.enterprise
cdi-api
diff --git a/spring-cassandra/src/main/java/org/springframework/cassandra/config/CassandraClusterFactoryBean.java b/spring-cassandra/src/main/java/org/springframework/cassandra/config/CassandraClusterFactoryBean.java
index cdd63f6c4..acb7375af 100644
--- a/spring-cassandra/src/main/java/org/springframework/cassandra/config/CassandraClusterFactoryBean.java
+++ b/spring-cassandra/src/main/java/org/springframework/cassandra/config/CassandraClusterFactoryBean.java
@@ -16,8 +16,10 @@
package org.springframework.cassandra.config;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,7 +31,7 @@ import org.springframework.cassandra.core.cql.generator.CreateKeyspaceCqlGenerat
import org.springframework.cassandra.core.cql.generator.DropKeyspaceCqlGenerator;
import org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification;
import org.springframework.cassandra.core.keyspace.DropKeyspaceSpecification;
-import org.springframework.cassandra.core.keyspace.KeyspaceNameSpecification;
+import org.springframework.cassandra.core.keyspace.KeyspaceActionSpecification;
import org.springframework.cassandra.support.CassandraExceptionTranslator;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
@@ -37,9 +39,11 @@ import org.springframework.util.StringUtils;
import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.LatencyTracker;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions.Compression;
+import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
@@ -51,32 +55,44 @@ import com.datastax.driver.core.policies.RetryPolicy;
*
* @author Alex Shvid
* @author Matthew T. Adams
+ * @author David Webb
*/
public class CassandraClusterFactoryBean implements FactoryBean, InitializingBean, DisposableBean,
PersistenceExceptionTranslator {
public static final String DEFAULT_CONTACT_POINTS = "localhost";
public static final boolean DEFAULT_METRICS_ENABLED = true;
+ public static final boolean DEFAULT_DEFERRED_INITIALIZATION = false;
+ public static final boolean DEFAULT_JMX_REPORTING_ENABLED = true;
+ public static final boolean DEFAULT_SSL_ENABLED = false;
public static final int DEFAULT_PORT = 9042;
protected static final Logger log = LoggerFactory.getLogger(CassandraClusterFactoryBean.class);
private Cluster cluster;
- /**
- * Comma-delimited string of servers.
+ /*
+ * Attributes needed for cluster builder
*/
private String contactPoints = DEFAULT_CONTACT_POINTS;
private int port = CassandraClusterFactoryBean.DEFAULT_PORT;
private CompressionType compressionType;
- private PoolingOptionsConfig localPoolingOptions;
- private PoolingOptionsConfig remotePoolingOptions;
- private SocketOptionsConfig socketOptions;
+ private PoolingOptions poolingOptions;
+ private SocketOptions socketOptions;
private AuthProvider authProvider;
+ private String username;
+ private String password;
private LoadBalancingPolicy loadBalancingPolicy;
private ReconnectionPolicy reconnectionPolicy;
private RetryPolicy retryPolicy;
private boolean metricsEnabled = DEFAULT_METRICS_ENABLED;
+ private boolean deferredInitialization = DEFAULT_DEFERRED_INITIALIZATION;
+ private boolean jmxReportingEnabled = DEFAULT_JMX_REPORTING_ENABLED;
+ private boolean sslEnabled = DEFAULT_SSL_ENABLED;
+ private SSLOptions sslOptions;
+ private Host.StateListener hostStateListener;
+ private LatencyTracker latencyTracker;
+ private Set> keyspaceSpecifications = new HashSet>();
private List keyspaceCreations = new ArrayList();
private List keyspaceDrops = new ArrayList();
private List startupScripts = new ArrayList();
@@ -119,20 +135,20 @@ public class CassandraClusterFactoryBean implements FactoryBean, Initia
builder.withCompression(convertCompressionType(compressionType));
}
- if (localPoolingOptions != null) {
- builder.withPoolingOptions(configPoolingOptions(HostDistance.LOCAL, localPoolingOptions));
- }
-
- if (remotePoolingOptions != null) {
- builder.withPoolingOptions(configPoolingOptions(HostDistance.REMOTE, remotePoolingOptions));
+ if (poolingOptions != null) {
+ builder.withPoolingOptions(poolingOptions);
}
if (socketOptions != null) {
- builder.withSocketOptions(configSocketOptions(socketOptions));
+ builder.withSocketOptions(socketOptions);
}
if (authProvider != null) {
builder.withAuthProvider(authProvider);
+
+ if (username != null) {
+ builder.withCredentials(username, password);
+ }
}
if (loadBalancingPolicy != null) {
@@ -147,14 +163,60 @@ public class CassandraClusterFactoryBean implements FactoryBean, Initia
builder.withRetryPolicy(retryPolicy);
}
+ if (deferredInitialization) {
+ builder.withDeferredInitialization();
+ }
+
if (!metricsEnabled) {
builder.withoutMetrics();
}
+ if (!jmxReportingEnabled) {
+ builder.withoutJMXReporting();
+ }
+
+ if (sslEnabled) {
+ if (sslOptions == null) {
+ builder.withSSL();
+ } else {
+ builder.withSSL(sslOptions);
+ }
+ }
+
cluster = builder.build();
+
+ if (hostStateListener != null) {
+ cluster.register(hostStateListener);
+ }
+
+ if (latencyTracker != null) {
+ cluster.register(latencyTracker);
+ }
+
+ generateSpecificationsFromFactoryBeans();
+
executeSpecsAndScripts(keyspaceCreations, startupScripts);
}
+ /**
+ * Examines the contents of all the KeyspaceSpecificationFactoryBeans and generates the proper KeyspaceSpecification
+ * from them.
+ */
+ private void generateSpecificationsFromFactoryBeans() {
+
+ for (KeyspaceActionSpecification> spec : keyspaceSpecifications) {
+
+ if (spec instanceof CreateKeyspaceSpecification) {
+ keyspaceCreations.add((CreateKeyspaceSpecification) spec);
+ }
+ if (spec instanceof DropKeyspaceSpecification) {
+ keyspaceDrops.add((DropKeyspaceSpecification) spec);
+ }
+
+ }
+
+ }
+
protected void executeSpecsAndScripts(@SuppressWarnings("rawtypes") List specs, List scripts) {
Session system = null;
@@ -167,7 +229,7 @@ public class CassandraClusterFactoryBean implements FactoryBean, Initia
Iterator> i = specs.iterator();
while (i.hasNext()) {
- KeyspaceNameSpecification> spec = (KeyspaceNameSpecification>) i.next();
+ KeyspaceActionSpecification> spec = (KeyspaceActionSpecification>) i.next();
String cql = (spec instanceof CreateKeyspaceSpecification) ? new CreateKeyspaceCqlGenerator(
(CreateKeyspaceSpecification) spec).toCql() : new DropKeyspaceCqlGenerator(
(DropKeyspaceSpecification) spec).toCql();
@@ -229,15 +291,11 @@ public class CassandraClusterFactoryBean implements FactoryBean, Initia
this.compressionType = compressionType;
}
- public void setLocalPoolingOptions(PoolingOptionsConfig localPoolingOptions) {
- this.localPoolingOptions = localPoolingOptions;
+ public void setPoolingOptions(PoolingOptions poolingOptions) {
+ this.poolingOptions = poolingOptions;
}
- public void setRemotePoolingOptions(PoolingOptionsConfig remotePoolingOptions) {
- this.remotePoolingOptions = remotePoolingOptions;
- }
-
- public void setSocketOptions(SocketOptionsConfig socketOptions) {
+ public void setSocketOptions(SocketOptions socketOptions) {
this.socketOptions = socketOptions;
}
@@ -295,52 +353,76 @@ public class CassandraClusterFactoryBean implements FactoryBean, Initia
throw new IllegalArgumentException("unknown compression type " + type);
}
- private static PoolingOptions configPoolingOptions(HostDistance hostDistance, PoolingOptionsConfig config) {
- PoolingOptions poolingOptions = new PoolingOptions();
-
- if (config.getMinSimultaneousRequests() != null) {
- poolingOptions
- .setMinSimultaneousRequestsPerConnectionThreshold(hostDistance, config.getMinSimultaneousRequests());
- }
- if (config.getMaxSimultaneousRequests() != null) {
- poolingOptions
- .setMaxSimultaneousRequestsPerConnectionThreshold(hostDistance, config.getMaxSimultaneousRequests());
- }
- if (config.getCoreConnections() != null) {
- poolingOptions.setCoreConnectionsPerHost(hostDistance, config.getCoreConnections());
- }
- if (config.getMaxConnections() != null) {
- poolingOptions.setMaxConnectionsPerHost(hostDistance, config.getMaxConnections());
- }
-
- return poolingOptions;
+ /**
+ * @return Returns the keyspaceSpecifications.
+ */
+ public Set> getKeyspaceSpecifications() {
+ return keyspaceSpecifications;
}
- private static SocketOptions configSocketOptions(SocketOptionsConfig config) {
- SocketOptions socketOptions = new SocketOptions();
+ /**
+ * If accumlating is true, we append to the list, otherwise we replace the list.
+ *
+ * @param keyspaceSpecifications The keyspaceSpecifications to set.
+ */
+ public void setKeyspaceSpecifications(Set> keyspaceSpecifications) {
+ log.info("Setter Called");
+ this.keyspaceSpecifications = keyspaceSpecifications;
+ }
- if (config.getConnectTimeoutMls() != null) {
- socketOptions.setConnectTimeoutMillis(config.getConnectTimeoutMls());
- }
- if (config.getKeepAlive() != null) {
- socketOptions.setKeepAlive(config.getKeepAlive());
- }
- if (config.getReuseAddress() != null) {
- socketOptions.setReuseAddress(config.getReuseAddress());
- }
- if (config.getSoLinger() != null) {
- socketOptions.setSoLinger(config.getSoLinger());
- }
- if (config.getTcpNoDelay() != null) {
- socketOptions.setTcpNoDelay(config.getTcpNoDelay());
- }
- if (config.getReceiveBufferSize() != null) {
- socketOptions.setReceiveBufferSize(config.getReceiveBufferSize());
- }
- if (config.getSendBufferSize() != null) {
- socketOptions.setSendBufferSize(config.getSendBufferSize());
- }
+ /**
+ * @param username The username to set.
+ */
+ public void setUsername(String username) {
+ this.username = username;
+ }
- return socketOptions;
+ /**
+ * @param password The password to set.
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * @param deferredInitialization The deferredInitialization to set.
+ */
+ public void setDeferredInitialization(boolean deferredInitialization) {
+ this.deferredInitialization = deferredInitialization;
+ }
+
+ /**
+ * @param jmxReportingEnabled The jmxReportingEnabled to set.
+ */
+ public void setJmxReportingEnabled(boolean jmxReportingEnabled) {
+ this.jmxReportingEnabled = jmxReportingEnabled;
+ }
+
+ /**
+ * @param sslEnabled The sslEnabled to set.
+ */
+ public void setSslEnabled(boolean sslEnabled) {
+ this.sslEnabled = sslEnabled;
+ }
+
+ /**
+ * @param sslOptions The sslOptions to set.
+ */
+ public void setSslOptions(SSLOptions sslOptions) {
+ this.sslOptions = sslOptions;
+ }
+
+ /**
+ * @param hostStateListener The hostStateListener to set.
+ */
+ public void setHostStateListener(Host.StateListener hostStateListener) {
+ this.hostStateListener = hostStateListener;
+ }
+
+ /**
+ * @param latencyTracker The latencyTracker to set.
+ */
+ public void setLatencyTracker(LatencyTracker latencyTracker) {
+ this.latencyTracker = latencyTracker;
}
}
diff --git a/spring-cassandra/src/main/java/org/springframework/cassandra/config/KeyspaceAction.java b/spring-cassandra/src/main/java/org/springframework/cassandra/config/KeyspaceAction.java
new file mode 100644
index 000000000..51f6e93c6
--- /dev/null
+++ b/spring-cassandra/src/main/java/org/springframework/cassandra/config/KeyspaceAction.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.cassandra.config;
+
+/**
+ * Available actions for Keyspace Specifications
+ *
+ * @author David Webb
+ */
+public enum KeyspaceAction {
+ CREATE, CREATE_DROP, ALTER;
+}
diff --git a/spring-cassandra/src/main/java/org/springframework/cassandra/config/KeyspaceActionSpecificationFactoryBean.java b/spring-cassandra/src/main/java/org/springframework/cassandra/config/KeyspaceActionSpecificationFactoryBean.java
new file mode 100644
index 000000000..e44d16587
--- /dev/null
+++ b/spring-cassandra/src/main/java/org/springframework/cassandra/config/KeyspaceActionSpecificationFactoryBean.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2011-2014 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.cassandra.config;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification;
+import org.springframework.cassandra.core.keyspace.DefaultOption;
+import org.springframework.cassandra.core.keyspace.DropKeyspaceSpecification;
+import org.springframework.cassandra.core.keyspace.KeyspaceActionSpecification;
+import org.springframework.cassandra.core.keyspace.KeyspaceOption;
+import org.springframework.cassandra.core.keyspace.KeyspaceOption.ReplicationStrategy;
+import org.springframework.cassandra.core.keyspace.Option;
+import org.springframework.util.Assert;
+
+/**
+ * A single keyspace XML Element can result in multiple actions. Example: {@literal CREATE_DROP}.
+ *
+ * This FactoryBean inspects the action required to satisfy the keyspace element, and then returns a Set of atomic
+ * {@link KeyspaceActionSpecification} required to satisfy the configuration action.
+ *
+ * @author David Webb
+ *
+ */
+public class KeyspaceActionSpecificationFactoryBean implements FactoryBean>>,
+ InitializingBean, DisposableBean {
+
+ private final static Logger log = LoggerFactory.getLogger(KeyspaceActionSpecificationFactoryBean.class);
+
+ private KeyspaceAction action;
+ private String name;
+ private List networkTopologyDataCenters = new LinkedList();
+ private List networkTopologyReplicationFactors = new LinkedList();
+ private String replicationStrategy;
+ private long replicationFactor;
+ private boolean durableWrites = false;
+ private boolean ifNotExists = false;
+
+ private Set> specs = new HashSet>();
+
+ @Override
+ public void destroy() throws Exception {
+ action = null;
+ name = null;
+ networkTopologyDataCenters = null;
+ networkTopologyReplicationFactors = null;
+ replicationStrategy = null;
+ specs = null;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+
+ Assert.hasText(name, "Keyspace Name is required for a Keyspace Action");
+ Assert.notNull(action, "Keyspace Action is required for a Keyspace Action");
+
+ switch (action) {
+ case CREATE_DROP:
+ specs.add(generateDropKeyspaceSpecification());
+ case CREATE:
+ // Assert.notNull(replicationStrategy, "Replication Strategy is required to create a Keyspace");
+ specs.add(generateCreateKeyspaceSpecification());
+ break;
+ case ALTER:
+ break;
+ }
+
+ }
+
+ /**
+ * Generate a {@link CreateKeyspaceSpecification} for the keyspace.
+ *
+ * @return The {@link CreateKeyspaceSpecification}
+ */
+ private CreateKeyspaceSpecification generateCreateKeyspaceSpecification() {
+
+ CreateKeyspaceSpecification create = new CreateKeyspaceSpecification();
+ create.name(name).ifNotExists(ifNotExists).with(KeyspaceOption.DURABLE_WRITES, durableWrites);
+
+ Map