DATACASS-126 - Added DataStax Java Driver 2.0.1 dependency and

cassandra-unit 2.0.X dependency.

Full support for Cassandra 2.0.X
This commit is contained in:
David T Webb
2014-04-29 09:45:14 -04:00
parent b8cbd20ae1
commit 8edc4ce1a9
25 changed files with 246 additions and 247 deletions

24
pom.xml
View File

@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -31,8 +32,8 @@
<project.type>multi</project.type>
<dist.id>spring-data-cassandra</dist.id>
<springdata.commons>1.8.0.BUILD-SNAPSHOT</springdata.commons>
<cassandra-unit.version>1.2.0.1</cassandra-unit.version>
<cassandra-driver-core.version>1.0.5-dse</cassandra-driver-core.version>
<cassandra-unit.version>2.0.2.1</cassandra-unit.version>
<cassandra-driver-dse.version>2.0.1</cassandra-driver-dse.version>
<failsafe.version>2.16</failsafe.version>
<jamm.version>0.2.5</jamm.version>
</properties>
@@ -78,13 +79,21 @@
<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra-driver-core.version}</version>
<artifactId>cassandra-driver-dse</artifactId>
<version>${cassandra-driver-dse.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@@ -203,7 +212,7 @@
<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<artifactId>cassandra-driver-dse</artifactId>
</dependency>
<dependency>
<groupId>com.github.stephenc</groupId>
@@ -286,7 +295,8 @@
<version>${failsafe.version}</version>
<configuration>
<forkCount>1</forkCount>
<argLine>-Xmx1024m -Xss512m -javaagent:${com.github.stephenc:jamm:jar}</argLine>
<argLine>-Xmx1024m -Xss512m
-javaagent:${com.github.stephenc:jamm:jar}</argLine>
<reuseForks>true</reuseForks>
<useFile>false</useFile>
<includes>

View File

@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -47,13 +48,7 @@
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
<artifactId>cassandra-driver-dse</artifactId>
</dependency>
<dependency>
<groupId>javax.enterprise</groupId>
@@ -81,6 +76,50 @@
<artifactId>el-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.0.6</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit-spring</artifactId>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
</dependencies>
<build>

View File

@@ -86,7 +86,6 @@ public class CassandraCqlClusterFactoryBean implements FactoryBean<Cluster>, Ini
private ReconnectionPolicy reconnectionPolicy;
private RetryPolicy retryPolicy;
private boolean metricsEnabled = DEFAULT_METRICS_ENABLED;
private boolean deferredInitialization = DEFAULT_DEFERRED_INITIALIZATION;
private boolean jmxReportingEnabled = DEFAULT_JMX_REPORTING_ENABLED;
private boolean sslEnabled = DEFAULT_SSL_ENABLED;
private SSLOptions sslOptions;
@@ -163,10 +162,6 @@ public class CassandraCqlClusterFactoryBean implements FactoryBean<Cluster>, Ini
builder.withRetryPolicy(retryPolicy);
}
if (deferredInitialization) {
builder.withDeferredInitialization();
}
if (!metricsEnabled) {
builder.withoutMetrics();
}
@@ -260,7 +255,7 @@ public class CassandraCqlClusterFactoryBean implements FactoryBean<Cluster>, Ini
} finally {
if (system != null) {
system.shutdown();
system.close();
}
}
}
@@ -269,7 +264,7 @@ public class CassandraCqlClusterFactoryBean implements FactoryBean<Cluster>, Ini
public void destroy() throws Exception {
executeSpecsAndScripts(keyspaceDrops, shutdownScripts);
cluster.shutdown();
cluster.close();
}
/**
@@ -379,13 +374,6 @@ public class CassandraCqlClusterFactoryBean implements FactoryBean<Cluster>, Ini
this.password = password;
}
/**
* @param deferredInitialization The deferredInitialization to set.
*/
public void setDeferredInitialization(boolean deferredInitialization) {
this.deferredInitialization = deferredInitialization;
}
/**
* @param jmxReportingEnabled The jmxReportingEnabled to set.
*/

View File

@@ -108,7 +108,7 @@ public class CassandraCqlSessionFactoryBean implements FactoryBean<Session>, Ini
public void destroy() throws Exception {
executeScripts(shutdownScripts);
session.shutdown();
session.close();
}
/**

View File

@@ -15,11 +15,6 @@
*/
package org.springframework.cassandra.config.xml;
import static org.springframework.cassandra.config.xml.ParsingUtils.addOptionalPropertyReference;
import static org.springframework.cassandra.config.xml.ParsingUtils.addOptionalPropertyValue;
import static org.springframework.cassandra.config.xml.ParsingUtils.addRequiredPropertyValue;
import static org.springframework.cassandra.config.xml.ParsingUtils.getSourceBeanDefinition;
import java.util.ArrayList;
import java.util.List;
@@ -46,6 +41,11 @@ import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.SocketOptions;
import static org.springframework.cassandra.config.xml.ParsingUtils.addOptionalPropertyReference;
import static org.springframework.cassandra.config.xml.ParsingUtils.addOptionalPropertyValue;
import static org.springframework.cassandra.config.xml.ParsingUtils.addRequiredPropertyValue;
import static org.springframework.cassandra.config.xml.ParsingUtils.getSourceBeanDefinition;
/**
* Parses the {@literal <cluster>} element of the XML Configuration.
*
@@ -97,7 +97,6 @@ public class CassandraCqlClusterParser extends AbstractBeanDefinitionParser {
addOptionalPropertyValue(builder, "compressionType", element, "compression", null);
addOptionalPropertyValue(builder, "username", element, "username", null);
addOptionalPropertyValue(builder, "password", element, "password", null);
addOptionalPropertyValue(builder, "deferredInitialization", element, "deferred-initialization", null);
addOptionalPropertyValue(builder, "metricsEnabled", element, "metrics-enabled", null);
addOptionalPropertyValue(builder, "jmxReportingEnabled", element, "jmx-reporting-enabled", null);
addOptionalPropertyValue(builder, "sslEnabled", element, "ssl-enabled", null);

View File

@@ -32,10 +32,10 @@ import org.springframework.cassandra.core.keyspace.DropKeyspaceSpecification;
import org.springframework.cassandra.core.keyspace.DropTableSpecification;
import org.springframework.dao.DataAccessException;
import com.datastax.driver.core.Query;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Batch;
import com.datastax.driver.core.querybuilder.Delete;
import com.datastax.driver.core.querybuilder.Insert;
@@ -78,9 +78,9 @@ public interface CqlOperations {
/**
* Executes the supplied Query and returns nothing.
*
* @param query The {@link Query} to execute
* @param query The {@link Statement} to execute
*/
void execute(Query query) throws DataAccessException;
void execute(Statement query) throws DataAccessException;
/**
* Executes the supplied Delete Query and returns nothing.
@@ -206,44 +206,44 @@ public interface CqlOperations {
/**
* Executes the supplied CQL Batch Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
* @param query The {@link Batch} to execute
*/
void executeAsynchronously(Batch batch) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
* @param query The {@link Statement} to execute
*/
void executeAsynchronously(Query query) throws DataAccessException;
void executeAsynchronously(Statement query) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
* @param query The {@link Statement} to execute
*/
void executeAsynchronously(Query query, Runnable runnable) throws DataAccessException;
void executeAsynchronously(Statement query, Runnable runnable) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
* @param query The {@link Statement} to execute
*/
void executeAsynchronously(Query query, AsynchronousQueryListener listener) throws DataAccessException;
void executeAsynchronously(Statement query, AsynchronousQueryListener listener) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
* @param query The {@link Statement} to execute
*/
void executeAsynchronously(Query query, Runnable runnable, Executor executor) throws DataAccessException;
void executeAsynchronously(Statement query, Runnable runnable, Executor executor) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
* @param query The {@link Statement} to execute
*/
void executeAsynchronously(Query query, AsynchronousQueryListener listener, Executor executor)
void executeAsynchronously(Statement query, AsynchronousQueryListener listener, Executor executor)
throws DataAccessException;
/**

View File

@@ -57,7 +57,6 @@ import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ColumnDefinitions.Definition;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Query;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
@@ -94,13 +93,13 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
protected static final Logger log = LoggerFactory.getLogger(CqlTemplate.class);
/**
* Add common {@link Query} options for all types of queries.
* Add common {@link Statement} options for all types of queries.
*
* @param q
* @param options
* @return the {@link Query} given.
* @return the {@link Statement} given.
*/
public static Query addQueryOptions(Query q, QueryOptions options) {
public static Statement addQueryOptions(Statement q, QueryOptions options) {
if (options == null) {
return q;
@@ -224,7 +223,7 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
}
@Override
public void execute(Query query) throws DataAccessException {
public void execute(Statement query) throws DataAccessException {
doExecute(query);
}
@@ -505,7 +504,7 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
* @param q The query to execute.
* @param options The {@link QueryOptions}. May be null.
*/
protected ResultSet doExecute(final Query q) {
protected ResultSet doExecute(final Statement q) {
return doExecute(new SessionCallback<ResultSet>() {
@@ -521,7 +520,7 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
});
}
protected ResultSetFuture doExecuteAsync(final Query q) {
protected ResultSetFuture doExecuteAsync(final Statement q) {
return doExecute(new SessionCallback<ResultSetFuture>() {
@@ -669,12 +668,12 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
}
@Override
public void executeAsynchronously(Query query) throws DataAccessException {
public void executeAsynchronously(Statement query) throws DataAccessException {
doExecuteAsync(query);
}
@Override
public void executeAsynchronously(Query query, Runnable listener) throws DataAccessException {
public void executeAsynchronously(Statement query, Runnable listener) throws DataAccessException {
executeAsynchronously(query, listener, new Executor() {
@Override
@@ -685,7 +684,7 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
}
@Override
public void executeAsynchronously(Query query, AsynchronousQueryListener listener) throws DataAccessException {
public void executeAsynchronously(Statement query, AsynchronousQueryListener listener) throws DataAccessException {
executeAsynchronously(query, listener, new Executor() {
@Override
@@ -696,7 +695,7 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
}
@Override
public void executeAsynchronously(final Query query, final Runnable listener, final Executor executor)
public void executeAsynchronously(final Statement query, final Runnable listener, final Executor executor)
throws DataAccessException {
execute(new SessionCallback<Object>() {
@Override
@@ -709,8 +708,8 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
}
@Override
public void executeAsynchronously(final Query query, final AsynchronousQueryListener listener, final Executor executor)
throws DataAccessException {
public void executeAsynchronously(final Statement query, final AsynchronousQueryListener listener,
final Executor executor) throws DataAccessException {
execute(new SessionCallback<Object>() {
@Override
@@ -991,7 +990,7 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
@Override
public void truncate(CqlIdentifier tableName) throws DataAccessException {
Truncate truncate = QueryBuilder.truncate(tableName.toCql());
doExecute(truncate.getQueryString(), null);
doExecute(truncate);
}
@Override
@@ -1205,7 +1204,7 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
@Override
public long count(CqlIdentifier tableName) {
return selectCount(QueryBuilder.select().countAll().from(tableName.toCql()).getQueryString());
return selectCount(QueryBuilder.select().countAll().from(tableName.toCql()));
}
@Override
@@ -1213,9 +1212,9 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
return count(cqlId(tableName));
}
protected long selectCount(String countQuery) {
protected long selectCount(Select select) {
return query(countQuery, new ResultSetExtractor<Long>() {
return query(select, new ResultSetExtractor<Long>() {
@Override
public Long extractData(ResultSet rs) throws DriverException, DataAccessException {

View File

@@ -31,14 +31,14 @@ public class CassandraConnectionFailureException extends DataAccessResourceFailu
private static final long serialVersionUID = 6299912054261646552L;
private final Map<InetAddress, String> messagesByHost = new HashMap<InetAddress, String>();
private final Map<InetAddress, Throwable> messagesByHost = new HashMap<InetAddress, Throwable>();
public CassandraConnectionFailureException(Map<InetAddress, String> messagesByHost, String msg, Throwable cause) {
public CassandraConnectionFailureException(Map<InetAddress, Throwable> map, String msg, Throwable cause) {
super(msg, cause);
this.messagesByHost.putAll(messagesByHost);
this.messagesByHost.putAll(map);
}
public Map<InetAddress, String> getMessagesByHost() {
public Map<InetAddress, Throwable> getMessagesByHost() {
return Collections.unmodifiableMap(messagesByHost);
}
}

View File

@@ -172,14 +172,6 @@ Determine whether or not to enable JMX Reporting. Defaults to true.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="deferred-initialization" type="xsd:string"
default="false">
<xsd:annotation>
<xsd:documentation><![CDATA[
Determine if we defer initalizing the cluster until a connection is requested. Defaults to false.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="ssl-enabled" type="xsd:string"
default="false">
<xsd:annotation>

View File

@@ -63,8 +63,8 @@ public class AbstractEmbeddedCassandraIntegrationTest {
}
@BeforeClass
public static void startCassandra() throws ConfigurationException, TTransportException, IOException,
InterruptedException {
public static void startCassandra() throws TTransportException, IOException, InterruptedException,
ConfigurationException {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(CASSANDRA_CONFIG);
}

View File

@@ -15,9 +15,6 @@
*/
package org.springframework.cassandra.test.integration.config.java;
import static org.springframework.cassandra.core.cql.generator.CreateKeyspaceCqlGenerator.toCql;
import static org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification.createKeyspace;
import org.springframework.cassandra.config.CassandraCqlSessionFactoryBean;
import org.springframework.cassandra.config.java.AbstractSessionConfiguration;
import org.springframework.context.annotation.Configuration;
@@ -26,6 +23,9 @@ import org.springframework.util.StringUtils;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Session;
import static org.springframework.cassandra.core.cql.generator.CreateKeyspaceCqlGenerator.toCql;
import static org.springframework.cassandra.core.keyspace.CreateKeyspaceSpecification.createKeyspace;
@Configuration
public abstract class AbstractKeyspaceCreatingConfiguration extends AbstractSessionConfiguration {
@@ -50,6 +50,6 @@ public abstract class AbstractKeyspaceCreatingConfiguration extends AbstractSess
}
system.execute(toCql(createKeyspace().name(keyspace).withSimpleReplication()));
system.shutdown();
system.close();
}
}

View File

@@ -29,7 +29,6 @@ import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cassandra.core.ConsistencyLevel;
@@ -1145,7 +1144,7 @@ public class CQLOperationsTest extends AbstractKeyspaceCreatingIntegrationTest {
@Test
public void insertAndTruncateQueryObjectTest() {
Log.info("Starting Insert and Truncate Query Object Test");
log.info("Starting Insert and Truncate Query Object Test");
String tableName = "truncate_test";

View File

@@ -9,14 +9,16 @@
<context:property-placeholder
location="classpath:/org/springframework/cassandra/test/integration/config/xml/ppncxct.properties" />
<bean id="authProvider" class="com.datastax.driver.core.sasl.DseAuthProvider" />
<bean id="authProvider" class="com.datastax.driver.auth.DseAuthProvider" />
<!--
<bean id="loadBalancingPolicy"
class="com.datastax.driver.core.policies.DCAwareRoundRobinPolicy">
<constructor-arg name="localDc" value="${lb.policy.dcAware.localDc}" />
<constructor-arg name="usedHostsPerRemoteDc"
value="${lb.policy.dcAware.remoteHosts}" />
</bean>
-->
<bean id="reconnectionPolicy"
class="com.datastax.driver.core.policies.ConstantReconnectionPolicy">
@@ -35,9 +37,9 @@
<cassandra:cluster contact-points="${cluster.contactPoints}"
port="${cluster.port}" compression="${cluster.compression}"
auth-info-provider-ref="authProvider" load-balancing-policy-ref="loadBalancingPolicy"
auth-info-provider-ref="authProvider"
username="${auth.username}" password="${auth.password}"
deferred-initialization="${cluster.deferredInit}" metrics-enabled="${cluster.metricsEnabled}"
metrics-enabled="${cluster.metricsEnabled}"
jmx-reporting-enabled="${cluster.jmxReportingEnabled}"
reconnection-policy-ref="reconnectionPolicy" retry-policy-ref="retryPolicy"
host-state-listener-ref="hostStateListener" latency-tracker-ref="latencyTracker">

View File

@@ -1,7 +1,6 @@
cluster.contactPoints=localhost
cluster.port=@build.cassandra.native_transport_port@
cluster.compression=SNAPPY
cluster.deferredInit=true
cluster.metricsEnabled=false
cluster.jmxReportingEnabled=false
cluster.reconnection.delayMillis=5000
@@ -12,7 +11,7 @@ dc1.name=DCJAX
dc1.rf=2
dc2.name=DCCTL
dc2.rf=3
lb.policy.dcAware.remoteHosts=4
lb.policy.dcAware.remoteHosts=0
lb.policy.dcAware.localDc=DCJAX
auth.username=test
auth.password=pass

View File

@@ -1,6 +1,5 @@
# Cassandra storage config YAML
# NOTE:
# See http://wiki.apache.org/cassandra/StorageConfiguration for
# full explanations of configuration directives
# /NOTE
@@ -170,24 +169,6 @@ row_cache_save_period: 0
# Disabled by default, meaning all keys are going to be saved
# row_cache_keys_to_save: 100
# The provider for the row cache to use.
#
# Supported values are: ConcurrentLinkedHashCacheProvider, SerializingCacheProvider
#
# SerializingCacheProvider serialises the contents of the row and stores
# it in native memory, i.e., off the JVM Heap. Serialized rows take
# significantly less memory than "live" rows in the JVM, so you can cache
# more rows in a given memory footprint. And storing the cache off-heap
# means you can use smaller heap sizes, reducing the impact of GC pauses.
# Note however that when a row is requested from the row cache, it must be
# deserialized into the heap for use.
#
# It is also valid to specify the fully-qualified class name to a class
# that implements org.apache.cassandra.cache.IRowCacheProvider.
#
# Defaults to SerializingCacheProvider
row_cache_provider: SerializingCacheProvider
# saved caches
saved_caches_directory: target/embeddedCassandra/saved_caches
@@ -230,31 +211,6 @@ seed_provider:
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "127.0.0.1"
# emergency pressure valve: each time heap usage after a full (CMS)
# garbage collection is above this fraction of the max, Cassandra will
# flush the largest memtables.
#
# Set to 1.0 to disable. Setting this lower than
# CMSInitiatingOccupancyFraction is not likely to be useful.
#
# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
# it is most effective under light to moderate load, or read-heavy
# workloads; under truly massive write load, it will often be too
# little, too late.
flush_largest_memtables_at: 0.75
# emergency pressure valve #2: the first time heap usage after a full
# (CMS) garbage collection is above this fraction of the max,
# Cassandra will reduce cache maximum _capacity_ to the given fraction
# of the current _size_. Should usually be set substantially above
# flush_largest_memtables_at, since that will have less long-term
# impact on the system.
#
# Set to 1.0 to disable. Setting this lower than
# CMSInitiatingOccupancyFraction is not likely to be useful.
reduce_cache_sizes_at: 0.85
reduce_cache_capacity_to: 0.6
# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from
# disk. "concurrent_reads" should be set to (16 * number_of_drives) in

View File

@@ -15,8 +15,6 @@
*/
package org.springframework.data.cassandra.convert;
import static org.springframework.data.cassandra.repository.support.BasicMapId.*;
import java.io.Serializable;
import java.util.Map;
@@ -51,6 +49,8 @@ import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Update;
import static org.springframework.data.cassandra.repository.support.BasicMapId.id;
/**
* {@link CassandraConverter} that uses a {@link MappingContext} to do sophisticated mapping of domain objects to
* {@link Row}.
@@ -243,13 +243,18 @@ public class MappingCassandraConverter extends AbstractCassandraConverter implem
Object value = wrapper.getProperty(prop, prop.getType());
log.debug("prop.type -> " + prop.getType().getName());
log.debug("prop.value -> " + value);
if (prop.isCompositePrimaryKey()) {
log.debug("prop is a compositeKey");
writeInsertFromWrapper(BeanWrapper.create(value, conversionService), insert,
prop.getCompositePrimaryKeyEntity());
return;
}
if (value != null) {
log.debug(String.format("Adding insert.value [%s] - [%s]", prop.getColumnName().toCql(), value));
insert.value(prop.getColumnName().toCql(), value);
}
}

View File

@@ -23,6 +23,8 @@ import org.springframework.cassandra.core.WriteOptions;
import org.springframework.cassandra.core.cql.CqlIdentifier;
import org.springframework.data.cassandra.convert.CassandraConverter;
import com.datastax.driver.core.querybuilder.Select;
/**
* Operations for interacting with Cassandra. These operations are used by the Repository implementation, but can also
* be used directly when that is desired by the developer.
@@ -50,10 +52,19 @@ public interface CassandraOperations extends CqlOperations {
*/
<T> List<T> select(String cql, Class<T> type);
/**
* Execute the Select Query and convert to the list of entities
*
* @param select must not be {@literal null}.
* @param type must not be {@literal null}, mapped entity type.
* @return
*/
<T> List<T> select(Select select, Class<T> type);
<T> T selectOneById(Class<T> type, Object id);
/**
* Execute query and convert ResultSet to the entity
* Execute CQL and convert ResultSet to the entity
*
* @param query must not be {@literal null}.
* @param type must not be {@literal null}, mapped entity type.
@@ -61,6 +72,15 @@ public interface CassandraOperations extends CqlOperations {
*/
<T> T selectOne(String cql, Class<T> type);
/**
* Execute Select query and convert ResultSet to the entity
*
* @param query must not be {@literal null}.
* @param type must not be {@literal null}, mapped entity type.
* @return
*/
<T> T selectOne(Select select, Class<T> type);
boolean exists(Class<?> type, Object id);
long count(Class<?> type);

View File

@@ -153,7 +153,7 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Delete delete = QueryBuilder.delete().from(entity.getTableName().toCql());
appendIdCriteria(delete.where(), entity, id);
execute(delete.getQueryString());
execute(delete);
}
@Override
@@ -233,7 +233,7 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
@Override
public <T> List<T> selectAll(Class<T> type) {
return select(QueryBuilder.select().all().from(getTableName(type).toCql()).getQueryString(), type);
return select(QueryBuilder.select().all().from(getTableName(type).toCql()), type);
}
@Override
@@ -245,6 +245,14 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
return select(cql, new CassandraConverterRowCallback<T>(cassandraConverter, type));
}
@Override
public <T> List<T> select(Select select, Class<T> type) {
Assert.notNull(select);
return select(select, new CassandraConverterRowCallback<T>(cassandraConverter, type));
}
@Override
public <T> List<T> selectBySimpleIds(Class<T> type, Iterable<?> ids) {
@@ -259,7 +267,7 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Select select = QueryBuilder.select().all().from(entity.getTableName().toCql());
select.where(QueryBuilder.in(entity.getIdProperty().getColumnName().toCql(), CollectionUtils.toArray(ids)));
return select(select.getQueryString(), type);
return select(select, type);
}
@Override
@@ -276,7 +284,7 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Select select = QueryBuilder.select().all().from(entity.getTableName().toCql());
appendIdCriteria(select.where(), entity, id);
return selectOne(select.getQueryString(), type);
return selectOne(select, type);
}
protected interface ClauseCallback {
@@ -353,6 +361,11 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
return selectOne(cql, new CassandraConverterRowCallback<T>(cassandraConverter, type));
}
@Override
public <T> T selectOne(Select select, Class<T> type) {
return selectOne(select, new CassandraConverterRowCallback<T>(cassandraConverter, type));
}
@Override
public <T> List<T> update(List<T> entities) {
return update(entities, null);
@@ -421,6 +434,30 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
return result;
}
protected <T> List<T> select(final Select query, CassandraConverterRowCallback<T> readRowCallback) {
ResultSet resultSet = doExecute(new SessionCallback<ResultSet>() {
@Override
public ResultSet doInSession(Session s) throws DataAccessException {
return s.execute(query);
}
});
if (resultSet == null) {
return null;
}
List<T> result = new ArrayList<T>();
Iterator<Row> iterator = resultSet.iterator();
while (iterator.hasNext()) {
Row row = iterator.next();
result.add(readRowCallback.doWith(row));
}
return result;
}
/**
* @param query
* @param readRowCallback
@@ -445,6 +482,23 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
return null;
}
protected <T> T selectOne(Select query, CassandraConverterRowCallback<T> readRowCallback) {
ResultSet resultSet = query(query);
Iterator<Row> iterator = resultSet.iterator();
if (iterator.hasNext()) {
Row row = iterator.next();
T result = readRowCallback.doWith(row);
if (iterator.hasNext()) {
throw new DuplicateKeyException("found two or more results in query " + query);
}
return result;
}
return null;
}
/**
* Perform the deletion on a list of objects
*
@@ -458,13 +512,10 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Batch b = createDeleteBatchQuery(getTableName(entities.get(0).getClass()).toCql(), entities, options,
cassandraConverter);
String query = b.getQueryString();
logger.debug(query);
if (asynchronously) {
executeAsynchronously(query);
executeAsynchronously(b);
} else {
execute(query);
execute(b);
}
}
@@ -474,13 +525,10 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Insert insert = createInsertQuery(getTableName(entity.getClass()).toCql(), entity, options, cassandraConverter);
String query = insert.getQueryString();
logger.debug(query);
if (asynchronously) {
executeAsynchronously(query);
executeAsynchronously(insert);
} else {
execute(query);
execute(insert);
}
return entity;
@@ -493,13 +541,10 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Batch b = createInsertBatchQuery(getTableName(entities.get(0).getClass()).toCql(), entities, options,
cassandraConverter);
String query = b.getQueryString();
logger.debug(query);
if (asychronously) {
executeAsynchronously(query);
executeAsynchronously(b);
} else {
execute(query);
execute(b);
}
return entities;
@@ -521,13 +566,10 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Batch b = toUpdateBatchQuery(getTableName(entities.get(0).getClass()).toCql(), entities, options,
cassandraConverter);
String query = b.getQueryString();
logger.debug(query);
if (asychronously) {
executeAsynchronously(query);
executeAsynchronously(b);
} else {
execute(query);
execute(b);
}
return entities;
@@ -545,13 +587,10 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Delete delete = createDeleteQuery(getTableName(entity.getClass()).toCql(), entity, options, cassandraConverter);
String query = delete.getQueryString();
logger.debug(query);
if (asynchronously) {
executeAsynchronously(query);
executeAsynchronously(delete);
} else {
execute(query);
execute(delete);
}
}
@@ -568,15 +607,12 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
Assert.notNull(entity);
Update q = toUpdateQuery(getTableName(entity.getClass()).toCql(), entity, options, cassandraConverter);
String query = q.getQueryString();
logger.debug(query);
Update update = toUpdateQuery(getTableName(entity.getClass()).toCql(), entity, options, cassandraConverter);
if (asychronously) {
executeAsynchronously(query);
executeAsynchronously(update);
} else {
execute(query);
execute(update);
}
return entity;
@@ -594,19 +630,19 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
public static Insert createInsertQuery(String tableName, Object objectToSave, WriteOptions options,
EntityWriter<Object, Object> entityWriter) {
Insert q = QueryBuilder.insertInto(tableName);
Insert insert = QueryBuilder.insertInto(tableName);
/*
* Write properties
*/
entityWriter.write(objectToSave, q);
entityWriter.write(objectToSave, insert);
/*
* Add Query Options
*/
CqlTemplate.addWriteOptions(q, options);
CqlTemplate.addWriteOptions(insert, options);
return q;
return insert;
}
/**
@@ -621,19 +657,19 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
public static Update toUpdateQuery(String tableName, Object objectToSave, WriteOptions options,
EntityWriter<Object, Object> entityWriter) {
Update q = QueryBuilder.update(tableName);
Update update = QueryBuilder.update(tableName);
/*
* Write properties
*/
entityWriter.write(objectToSave, q);
entityWriter.write(objectToSave, update);
/*
* Add Query Options
*/
CqlTemplate.addWriteOptions(q, options);
CqlTemplate.addWriteOptions(update, options);
return q;
return update;
}
/**
@@ -704,14 +740,15 @@ public class CassandraTemplate extends CqlTemplate implements CassandraOperation
EntityWriter<Object, Object> entityWriter) {
Delete.Selection ds = QueryBuilder.delete();
Delete q = ds.from(tableName);
Where w = q.where();
Delete delete = ds.from(tableName);
Where w = delete.where();
entityWriter.write(object, w);
CqlTemplate.addQueryOptions(q, options);
CqlTemplate.addQueryOptions(delete, options);
return q;
return delete;
}
/**

View File

@@ -110,6 +110,6 @@ public class SimpleCassandraRepository<T, ID extends Serializable> implements Ty
}
protected List<T> findAll(Select query) {
return template.select(query.getQueryString(), entityInformation.getJavaType());
return template.select(query, entityInformation.getJavaType());
}
}

View File

@@ -181,14 +181,6 @@ Determine whether or not to enable JMX Reporting. Defaults to true.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="deferred-initialization" type="xsd:string"
default="false">
<xsd:annotation>
<xsd:documentation><![CDATA[
Determine if we defer initalizing the cluster until a connection is requested. Defaults to false.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="ssl-enabled" type="xsd:string"
default="false">
<xsd:annotation>

View File

@@ -85,7 +85,7 @@ public class CollectionsRowValueProviderTest extends AbstractSpringDataEmbeddedC
Select select = QueryBuilder.select().all().from("bookHistory");
select.where(QueryBuilder.eq("isbn", "123456-1"));
BookHistory b = template.selectOne(select.getQueryString(), BookHistory.class);
BookHistory b = template.selectOne(select, BookHistory.class);
Assert.assertNotNull(b.getCheckOuts());
@@ -127,7 +127,7 @@ public class CollectionsRowValueProviderTest extends AbstractSpringDataEmbeddedC
Select select = QueryBuilder.select().all().from("bookReference");
select.where(QueryBuilder.eq("isbn", "123456-1"));
BookReference b = template.selectOne(select.getQueryString(), BookReference.class);
BookReference b = template.selectOne(select, BookReference.class);
Assert.assertNotNull(b.getReferences());
Assert.assertNotNull(b.getBookmarks());

View File

@@ -15,10 +15,6 @@
*/
package org.springframework.data.cassandra.test.integration.querymethods.declared;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
import java.lang.reflect.Method;
import org.junit.Before;
@@ -40,6 +36,12 @@ import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link StringBasedCassandraQuery}.
*/
@@ -74,6 +76,7 @@ public class StringBasedCassandraQueryIntegrationTests {
String table = Person.class.getSimpleName().toLowerCase();
Select expected = QueryBuilder.select().all().from(table);
expected.setForceNoValues(true);
expected.where(QueryBuilder.eq("lastname", "Matthews"));
assertThat(actual.getQueryString(), is(expected.getQueryString()));
@@ -93,6 +96,7 @@ public class StringBasedCassandraQueryIntegrationTests {
String table = Person.class.getSimpleName().toLowerCase();
Select expected = QueryBuilder.select().all().from(table);
expected.setForceNoValues(true);
expected.where(QueryBuilder.eq("lastname", "Matthews")).and(QueryBuilder.eq("firstname", "John"));
assertThat(actual.getQueryString(), is(expected.getQueryString()));

View File

@@ -616,7 +616,7 @@ public class CassandraDataOperationsTest extends AbstractSpringDataEmbeddedCassa
Select select = QueryBuilder.select().all().from("book");
select.where(QueryBuilder.eq("isbn", "123456-1"));
Book b = template.selectOne(select.getQueryString(), Book.class);
Book b = template.selectOne(select, Book.class);
log.debug("SingleSelect Book Title -> " + b.getTitle());
log.debug("SingleSelect Book Author -> " + b.getAuthor());
@@ -635,7 +635,7 @@ public class CassandraDataOperationsTest extends AbstractSpringDataEmbeddedCassa
Select select = QueryBuilder.select().all().from("book");
List<Book> bookz = template.select(select.getQueryString(), Book.class);
List<Book> bookz = template.select(select, Book.class);
log.debug("Book Count -> " + bookz.size());

View File

@@ -11,9 +11,10 @@
<logger name="org.springframework.cassandra" level="info" />
<logger name="org.springframework.data.cassandra" level="info" />
<logger name="com.datastax.driver" level="info" />
<root level="warn">
<appender-ref ref="console" />
</root>
</configuration>
</configuration>

View File

@@ -170,24 +170,6 @@ row_cache_save_period: 0
# Disabled by default, meaning all keys are going to be saved
# row_cache_keys_to_save: 100
# The provider for the row cache to use.
#
# Supported values are: ConcurrentLinkedHashCacheProvider, SerializingCacheProvider
#
# SerializingCacheProvider serialises the contents of the row and stores
# it in native memory, i.e., off the JVM Heap. Serialized rows take
# significantly less memory than "live" rows in the JVM, so you can cache
# more rows in a given memory footprint. And storing the cache off-heap
# means you can use smaller heap sizes, reducing the impact of GC pauses.
# Note however that when a row is requested from the row cache, it must be
# deserialized into the heap for use.
#
# It is also valid to specify the fully-qualified class name to a class
# that implements org.apache.cassandra.cache.IRowCacheProvider.
#
# Defaults to SerializingCacheProvider
row_cache_provider: SerializingCacheProvider
# saved caches
saved_caches_directory: target/embeddedCassandra/saved_caches
@@ -230,31 +212,6 @@ seed_provider:
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "127.0.0.1"
# emergency pressure valve: each time heap usage after a full (CMS)
# garbage collection is above this fraction of the max, Cassandra will
# flush the largest memtables.
#
# Set to 1.0 to disable. Setting this lower than
# CMSInitiatingOccupancyFraction is not likely to be useful.
#
# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
# it is most effective under light to moderate load, or read-heavy
# workloads; under truly massive write load, it will often be too
# little, too late.
flush_largest_memtables_at: 0.75
# emergency pressure valve #2: the first time heap usage after a full
# (CMS) garbage collection is above this fraction of the max,
# Cassandra will reduce cache maximum _capacity_ to the given fraction
# of the current _size_. Should usually be set substantially above
# flush_largest_memtables_at, since that will have less long-term
# impact on the system.
#
# Set to 1.0 to disable. Setting this lower than
# CMSInitiatingOccupancyFraction is not likely to be useful.
reduce_cache_sizes_at: 0.85
reduce_cache_capacity_to: 0.6
# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from
# disk. "concurrent_reads" should be set to (16 * number_of_drives) in