DATACOUCH-65 - Add support for optimistic locking through @Version

@Version support is built on top of CAS values in couchbase. It does
not store an actual version number in the document, but rather reuses
cas values transparently to handle optimistic locking.

Also, write result checking has been added to the template for
better control what happens if a write fails. Defaults to NONE,
but can be sett to LOG or EXCEPTION.
This commit is contained in:
Michael Nitschinger
2014-01-24 11:58:55 +01:00
parent bb6385e08a
commit 11ed095c58
7 changed files with 360 additions and 24 deletions

View File

@@ -0,0 +1,38 @@
/*
* Copyright 2013, 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.couchbase.core;
import org.springframework.dao.DataIntegrityViolationException;
/**
* A Couchbase specific integrity violation exception, thrown as a result of failing db operations.
*
* @author Michael Nitschinger
*/
public class CouchbaseDataIntegrityViolationException extends DataIntegrityViolationException {
private static final long serialVersionUID = -3724991479213025850L;
public CouchbaseDataIntegrityViolationException(String msg) {
super(msg);
}
public CouchbaseDataIntegrityViolationException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@@ -21,9 +21,14 @@ import com.couchbase.client.protocol.views.Query;
import com.couchbase.client.protocol.views.View;
import com.couchbase.client.protocol.views.ViewResponse;
import com.couchbase.client.protocol.views.ViewRow;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.PersistTo;
import net.spy.memcached.ReplicateTo;
import net.spy.memcached.internal.OperationFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.couchbase.core.convert.CouchbaseConverter;
import org.springframework.data.couchbase.core.convert.MappingCouchbaseConverter;
@@ -34,7 +39,9 @@ import org.springframework.data.couchbase.core.mapping.CouchbaseMappingContext;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentEntity;
import org.springframework.data.couchbase.core.mapping.CouchbasePersistentProperty;
import org.springframework.data.couchbase.core.mapping.CouchbaseStorable;
import org.springframework.data.mapping.PersistentEntity;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mapping.model.BeanWrapper;
import java.util.ArrayList;
import java.util.Collection;
@@ -51,13 +58,18 @@ import java.util.concurrent.TimeoutException;
*/
public class CouchbaseTemplate implements CouchbaseOperations {
private final CouchbaseClient client;
private CouchbaseConverter couchbaseConverter;
protected final MappingContext<? extends CouchbasePersistentEntity<?>, CouchbasePersistentProperty> mappingContext;
private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseTemplate.class);
private static final Collection<String> ITERABLE_CLASSES;
private static final WriteResultChecking DEFAULT_WRITE_RESULT_CHECKING = WriteResultChecking.NONE;
private final CouchbaseClient client;
protected final MappingContext<? extends CouchbasePersistentEntity<?>, CouchbasePersistentProperty> mappingContext;
private final CouchbaseExceptionTranslator exceptionTranslator = new CouchbaseExceptionTranslator();
private final TranslationService<String> translationService;
private CouchbaseConverter couchbaseConverter;
private WriteResultChecking writeResultChecking = DEFAULT_WRITE_RESULT_CHECKING;
static {
final Set<String> iterableClasses = new HashSet<String>();
iterableClasses.add(List.class.getName());
@@ -66,6 +78,10 @@ public class CouchbaseTemplate implements CouchbaseOperations {
ITERABLE_CLASSES = Collections.unmodifiableCollection(iterableClasses);
}
public void setWriteResultChecking(final WriteResultChecking resultChecking) {
writeResultChecking = resultChecking == null ? DEFAULT_WRITE_RESULT_CHECKING : resultChecking;
}
public CouchbaseTemplate(final CouchbaseClient client) {
this(client, null, null);
}
@@ -100,6 +116,7 @@ public class CouchbaseTemplate implements CouchbaseOperations {
return translationService.encode(source);
}
private CouchbaseStorable translateDecode(final String source, final CouchbaseStorable target) {
return translationService.decode(source, target);
}
@@ -142,10 +159,10 @@ public class CouchbaseTemplate implements CouchbaseOperations {
@Override
public final <T> T findById(final String id, final Class<T> entityClass) {
String result = execute(new BucketCallback<String>() {
CASValue result = execute(new BucketCallback<CASValue>() {
@Override
public String doInBucket() {
return (String) client.get(id);
public CASValue doInBucket() {
return client.gets(id);
}
});
@@ -154,15 +171,25 @@ public class CouchbaseTemplate implements CouchbaseOperations {
}
final CouchbaseDocument converted = new CouchbaseDocument(id);
return couchbaseConverter.read(entityClass, (CouchbaseDocument) translateDecode(result, converted));
Object readEntity = couchbaseConverter.read(entityClass, (CouchbaseDocument) translateDecode(
(String) result.getValue(), converted));
final BeanWrapper<PersistentEntity<Object, ?>, Object> beanWrapper = BeanWrapper.create(readEntity,
couchbaseConverter.getConversionService());
CouchbasePersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(readEntity.getClass());
if (persistentEntity.hasVersionProperty()) {
beanWrapper.setProperty(persistentEntity.getVersionProperty(), result.getCas());
}
return (T) readEntity;
}
@Override
public <T> List<T> findByView(final String designName, final String viewName, final Query query, final Class<T> entityClass) {
if (!query.willIncludeDocs()) {
query.setIncludeDocs(true);
if (query.willIncludeDocs()) {
query.setIncludeDocs(false);
}
if (query.willReduce()) {
query.setReduce(false);
@@ -172,8 +199,7 @@ public class CouchbaseTemplate implements CouchbaseOperations {
final List<T> result = new ArrayList<T>(response.size());
for (final ViewRow row : response) {
final CouchbaseDocument converted = new CouchbaseDocument(row.getId());
result.add(couchbaseConverter.read(entityClass, (CouchbaseDocument) translateDecode((String) row.getDocument(), converted)));
result.add(findById(row.getId(), entityClass));
}
return result;
@@ -250,14 +276,38 @@ public class CouchbaseTemplate implements CouchbaseOperations {
public void save(Object objectToSave, final PersistTo persistTo, final ReplicateTo replicateTo) {
ensureNotIterable(objectToSave);
final BeanWrapper<PersistentEntity<Object, ?>, Object> beanWrapper = BeanWrapper.create(objectToSave,
couchbaseConverter.getConversionService());
CouchbasePersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(objectToSave.getClass());
final CouchbasePersistentProperty versionProperty = persistentEntity.getVersionProperty();
final Long version = versionProperty != null ? beanWrapper.getProperty(versionProperty, Long.class, true) : null;
final CouchbaseDocument converted = new CouchbaseDocument();
couchbaseConverter.write(objectToSave, converted);
execute(new BucketCallback<Boolean>() {
@Override
public Boolean doInBucket() throws InterruptedException, ExecutionException {
return client.set(converted.getId(), converted.getExpiration(), translateEncode(converted), persistTo,
replicateTo).get();
if (version == null) {
OperationFuture<Boolean> setFuture = client.set(converted.getId(), converted.getExpiration(),
translateEncode(converted), persistTo, replicateTo);
boolean future = setFuture.get();
if (!future) {
handleWriteResultError("Saving document failed: " + setFuture.getStatus().getMessage());
}
return future;
} else {
OperationFuture<CASResponse> casFuture = client.asyncCas(converted.getId(), version,
converted.getExpiration(), translateEncode(converted), persistTo, replicateTo);
CASResponse cas = casFuture.get();
if (cas == CASResponse.EXISTS) {
throw new OptimisticLockingFailureException("Saving document with version value failed: " + cas);
} else {
long newCas = casFuture.getCas();
beanWrapper.setProperty(versionProperty, newCas);
return true;
}
}
}
});
}
@@ -273,14 +323,35 @@ public class CouchbaseTemplate implements CouchbaseOperations {
public void insert(Object objectToInsert, final PersistTo persistTo, final ReplicateTo replicateTo) {
ensureNotIterable(objectToInsert);
final CouchbasePersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(objectToInsert.getClass());
final BeanWrapper<PersistentEntity<Object,?>, Object> beanWrapper = BeanWrapper.create(objectToInsert,
couchbaseConverter.getConversionService());
if (persistentEntity != null && persistentEntity.hasVersionProperty()) {
final Long version = beanWrapper.getProperty(persistentEntity.getVersionProperty(), Long.class, true);
if (version == 0) {
beanWrapper.setProperty(persistentEntity.getVersionProperty(), 0);
}
}
final CouchbaseDocument converted = new CouchbaseDocument();
couchbaseConverter.write(objectToInsert, converted);
execute(new BucketCallback<Boolean>() {
@Override
public Boolean doInBucket() throws InterruptedException, ExecutionException {
return client.add(converted.getId(), converted.getExpiration(), translateEncode(converted), persistTo,
replicateTo).get();
OperationFuture<Boolean> addFuture = client.add(converted.getId(), converted.getExpiration(),
translateEncode(converted), persistTo, replicateTo);
boolean result = addFuture.get();
if(result == false) {
handleWriteResultError("Inserting document failed: "
+ addFuture.getStatus().getMessage());
}
if (result && persistentEntity.hasVersionProperty()) {
beanWrapper.setProperty(persistentEntity.getVersionProperty(), addFuture.getCas());
}
return result;
}
});
}
@@ -296,14 +367,34 @@ public class CouchbaseTemplate implements CouchbaseOperations {
public void update(Object objectToUpdate, final PersistTo persistTo, final ReplicateTo replicateTo) {
ensureNotIterable(objectToUpdate);
final BeanWrapper<PersistentEntity<Object, ?>, Object> beanWrapper = BeanWrapper.create(objectToUpdate,
couchbaseConverter.getConversionService());
CouchbasePersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(objectToUpdate.getClass());
final CouchbasePersistentProperty versionProperty = persistentEntity.getVersionProperty();
final Long version = versionProperty != null ? beanWrapper.getProperty(versionProperty, Long.class, true) : null;
final CouchbaseDocument converted = new CouchbaseDocument();
couchbaseConverter.write(objectToUpdate, converted);
execute(new BucketCallback<Boolean>() {
@Override
public Boolean doInBucket() throws InterruptedException, ExecutionException {
return client.replace(converted.getId(), converted.getExpiration(), translateEncode(converted), persistTo,
replicateTo).get();
if (version == null) {
return client.replace(converted.getId(), converted.getExpiration(), translateEncode(converted), persistTo,
replicateTo).get();
} else {
OperationFuture<CASResponse> casFuture = client.asyncCas(converted.getId(), version,
converted.getExpiration(), translateEncode(converted), persistTo, replicateTo);
CASResponse cas = casFuture.get();
if (cas == CASResponse.EXISTS) {
throw new OptimisticLockingFailureException("Updating document with version value failed: " + cas);
} else {
long newCas = casFuture.getCas();
beanWrapper.setProperty(versionProperty, newCas);
return true;
}
}
}
});
}
@@ -346,4 +437,22 @@ public class CouchbaseTemplate implements CouchbaseOperations {
remove(toRemove, persistTo, replicateTo);
}
}
/**
* Handle write errors according to the set {@link #writeResultChecking} setting.
*
* @param message the message to use.
*/
private void handleWriteResultError(String message) {
if (writeResultChecking == WriteResultChecking.NONE) {
return;
}
if (writeResultChecking == WriteResultChecking.EXCEPTION) {
throw new CouchbaseDataIntegrityViolationException(message);
} else {
LOGGER.error(message);
}
}
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright 2013, 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.couchbase.core;
/**
* How failing write results should be handled.
*
* @author Michael Nitschinger
*/
public enum WriteResultChecking {
/**
* Ignore failing write results.
*/
NONE,
/**
* Log failing write results.
*/
LOG,
/**
* Throw Exceptions on failing write results.
*/
EXCEPTION
}

View File

@@ -258,6 +258,8 @@ public class MappingCouchbaseConverter extends AbstractCouchbaseConverter
final BeanWrapper<CouchbasePersistentEntity<Object>, Object> wrapper = BeanWrapper.create(source, conversionService);
final CouchbasePersistentProperty idProperty = entity.getIdProperty();
final CouchbasePersistentProperty versionProperty = entity.getVersionProperty();
if (idProperty != null && target.getId() == null) {
String id = wrapper.getProperty(idProperty, String.class, useFieldAccessOnly);
target.setId(id);
@@ -269,6 +271,9 @@ public class MappingCouchbaseConverter extends AbstractCouchbaseConverter
if (prop.equals(idProperty)) {
return;
}
if (versionProperty != null && prop.equals(versionProperty)) {
return;
}
Object propertyObj = wrapper.getProperty(prop, prop.getType(), useFieldAccessOnly);
if (null != propertyObj) {

View File

@@ -23,6 +23,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.Environment;
import org.springframework.data.couchbase.config.AbstractCouchbaseConfiguration;
import org.springframework.data.couchbase.core.CouchbaseTemplate;
import org.springframework.data.couchbase.core.WriteResultChecking;
import java.net.URI;
import java.util.Arrays;
@@ -74,5 +76,10 @@ public class TestApplicationConfig extends AbstractCouchbaseConfiguration {
return super.couchbaseClient();
}
@Override
public CouchbaseTemplate couchbaseTemplate() throws Exception {
CouchbaseTemplate template = super.couchbaseTemplate();
template.setWriteResultChecking(WriteResultChecking.LOG);
return template;
}
}

View File

@@ -19,10 +19,13 @@ package org.springframework.data.couchbase.core;
import com.couchbase.client.CouchbaseClient;
import com.couchbase.client.protocol.views.Query;
import com.couchbase.client.protocol.views.Stale;
import net.spy.memcached.CASValue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
import org.springframework.data.couchbase.TestApplicationConfig;
import org.springframework.data.couchbase.core.mapping.Document;
import org.springframework.data.couchbase.core.mapping.Field;
@@ -39,10 +42,7 @@ import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
/**
* @author Michael Nitschinger
@@ -85,8 +85,10 @@ public class CouchbaseTemplateTests {
}
@Test
public void insertDoesNotOverride() {
public void insertDoesNotOverride() throws Exception {
String id = "double-insert-test";
client.delete(id).get();
String expected = "{\"_class\":\"org.springframework.data.couchbase.core."
+ "CouchbaseTemplateTests$SimplePerson\",\"name\":\"Mr. A\"}";
@@ -219,6 +221,108 @@ public class CouchbaseTemplateTests {
assertThat(simpleWithClass.getValue(), equalTo("The dish ran away with the spoon."));
}
@Test
public void shouldHandleCASVersionOnInsert() throws Exception {
client.delete("versionedClass:1").get();
VersionedClass versionedClass = new VersionedClass("versionedClass:1", "foobar");
assertEquals(0, versionedClass.getVersion());
template.insert(versionedClass);
CASValue<Object> rawStored = client.gets("versionedClass:1");
assertEquals(rawStored.getCas(), versionedClass.getVersion());
}
@Test
public void versionShouldNotUpdateOnSecondInsert() throws Exception {
client.delete("versionedClass:2").get();
VersionedClass versionedClass = new VersionedClass("versionedClass:2", "foobar");
template.insert(versionedClass);
long version1 = versionedClass.getVersion();
template.insert(versionedClass);
long version2 = versionedClass.getVersion();
assertTrue(version1 > 0);
assertTrue(version2 > 0);
assertEquals(version1, version2);
}
@Test
public void shouldSaveDocumentOnMatchingVersion() throws Exception {
client.delete("versionedClass:3").get();
VersionedClass versionedClass = new VersionedClass("versionedClass:3", "foobar");
template.insert(versionedClass);
long version1 = versionedClass.getVersion();
versionedClass.setField("foobar2");
template.save(versionedClass);
long version2 = versionedClass.getVersion();
assertTrue(version1 > 0);
assertTrue(version2 > 0);
assertNotEquals(version1, version2);
assertEquals("foobar2", template.findById("versionedClass:3", VersionedClass.class).getField());
}
@Test(expected = OptimisticLockingFailureException.class)
public void shouldNotSaveDocumentOnNotMatchingVersion() throws Exception {
client.delete("versionedClass:4").get();
VersionedClass versionedClass = new VersionedClass("versionedClass:4", "foobar");
template.insert(versionedClass);
assertTrue(client.set("versionedClass:4", "different").get());
versionedClass.setField("foobar2");
template.save(versionedClass);
}
@Test
public void shouldUpdateDocumentOnMatchingVersion() throws Exception {
client.delete("versionedClass:5").get();
VersionedClass versionedClass = new VersionedClass("versionedClass:5", "foobar");
template.insert(versionedClass);
long version1 = versionedClass.getVersion();
versionedClass.setField("foobar2");
template.update(versionedClass);
long version2 = versionedClass.getVersion();
assertTrue(version1 > 0);
assertTrue(version2 > 0);
assertNotEquals(version1, version2);
assertEquals("foobar2", template.findById("versionedClass:5", VersionedClass.class).getField());
}
@Test(expected = OptimisticLockingFailureException.class)
public void shouldNotUpdateDocumentOnNotMatchingVersion() throws Exception {
client.delete("versionedClass:6").get();
VersionedClass versionedClass = new VersionedClass("versionedClass:6", "foobar");
template.insert(versionedClass);
assertTrue(client.set("versionedClass:6", "different").get());
versionedClass.setField("foobar2");
template.update(versionedClass);
}
@Test
public void shouldLoadVersionPropertyOnFind() throws Exception {
client.delete("versionedClass:7").get();
VersionedClass versionedClass = new VersionedClass("versionedClass:7", "foobar");
template.insert(versionedClass);
assertTrue(versionedClass.getVersion() > 0);
VersionedClass foundClass = template.findById("versionedClass:7", VersionedClass.class);
assertEquals(versionedClass.getVersion(), foundClass.getVersion());
}
/**
* A sample document with just an id and property.
*/
@@ -392,4 +496,36 @@ public class CouchbaseTemplateTests {
}
}
static class VersionedClass {
@Id
private String id;
@Version
private long version;
private String field;
VersionedClass(String id, String field) {
this.id = id;
this.field = field;
}
public String getId() {
return id;
}
public long getVersion() {
return version;
}
public String getField() {
return field;
}
public void setField(String field) {
this.field = field;
}
}
}

View File

@@ -13,4 +13,5 @@ Import-Template:
org.springframework.*;version="${spring30:[=.=.=.=,+1.1.0)}",
org.springframework.data.*;version="${springdata.commons:[=.=.=.=,+1.0.0)}",
org.w3c.*;version="0.0.0",
org.aopalliance.*;version="[1.0.0, 2.0.0)";resolution:=optional
org.aopalliance.*;version="[1.0.0, 2.0.0)";resolution:=optional,
org.slf4j.*;version="${slf4j:[=.=.=,+1.0.0)}"