Better type conversion when stored object type differs from requested type. Fixes for updates that require the vclock to be sent back to Riak for object updates.
This commit is contained in:
@@ -30,7 +30,9 @@ import org.springframework.core.convert.support.ConversionServiceFactory;
|
||||
import org.springframework.data.keyvalue.riak.DataStoreOperationException;
|
||||
import org.springframework.data.keyvalue.riak.convert.KeyValueStoreMetaData;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpInputMessage;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.ClientHttpRequestFactory;
|
||||
import org.springframework.http.converter.HttpMessageConverter;
|
||||
import org.springframework.http.converter.json.MappingJacksonHttpMessageConverter;
|
||||
@@ -40,7 +42,9 @@ import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import org.springframework.web.client.support.RestGatewaySupport;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
@@ -60,6 +64,7 @@ import java.util.regex.Pattern;
|
||||
public abstract class AbstractRiakTemplate extends RestGatewaySupport implements InitializingBean {
|
||||
|
||||
protected static final String RIAK_META_CLASSNAME = "X-Riak-Meta-ClassName";
|
||||
protected static final String RIAK_VCLOCK = "X-Riak-Vclock";
|
||||
|
||||
/**
|
||||
* Regex used to extract host, port, and prefix from the given URI.
|
||||
@@ -112,7 +117,9 @@ public abstract class AbstractRiakTemplate extends RestGatewaySupport implements
|
||||
/**
|
||||
* A list of resolvers to turn a single object into a {@link BucketKeyPair}.
|
||||
*/
|
||||
protected List<BucketKeyResolver> bucketKeyResolvers;
|
||||
protected List<BucketKeyResolver> bucketKeyResolvers = new ArrayList<BucketKeyResolver>() {{
|
||||
add(new SimpleBucketKeyResolver());
|
||||
}};
|
||||
/**
|
||||
* The default QosParameters to use for all operations through this template.
|
||||
*/
|
||||
@@ -220,10 +227,6 @@ public abstract class AbstractRiakTemplate extends RestGatewaySupport implements
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
Assert.notNull(conversionService,
|
||||
"Must specify a valid ConversionService.");
|
||||
if (null == bucketKeyResolvers) {
|
||||
bucketKeyResolvers = new ArrayList<BucketKeyResolver>();
|
||||
bucketKeyResolvers.add(new SimpleBucketKeyResolver());
|
||||
}
|
||||
|
||||
List<HttpMessageConverter<?>> converters = getRestTemplate().getMessageConverters();
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
@@ -323,6 +326,49 @@ public abstract class AbstractRiakTemplate extends RestGatewaySupport implements
|
||||
return meta;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
protected <T> RiakValue<T> extractValue(final ResponseEntity<?> response, Class<?> origType, Class<T> requiredType) throws
|
||||
IOException {
|
||||
if (response.hasBody()) {
|
||||
RiakMetaData meta = extractMetaData(response.getHeaders());
|
||||
Object o = response.getBody();
|
||||
if (!origType.equals(requiredType)) {
|
||||
if (conversionService.canConvert(origType, requiredType)) {
|
||||
o = conversionService.convert(o, requiredType);
|
||||
} else {
|
||||
if (o instanceof byte[] || o instanceof String) {
|
||||
// Peek inside, see if it's a string of something we recognize
|
||||
String s = (o instanceof byte[] ? new String((byte[]) o) : (String) o);
|
||||
if (s.charAt(0) == '{' || s.charAt(0) == '[') {
|
||||
// Looks like it might be a JSON string. Use the JSON converter
|
||||
for (HttpMessageConverter conv : getRestTemplate().getMessageConverters()) {
|
||||
if (conv instanceof MappingJacksonHttpMessageConverter) {
|
||||
o = conv.read(requiredType, new HttpInputMessage() {
|
||||
public InputStream getBody() throws IOException {
|
||||
Object body = response.getBody();
|
||||
return new ByteArrayInputStream((body instanceof byte[] ? (byte[]) body : ((String) body)
|
||||
.getBytes()));
|
||||
}
|
||||
|
||||
public HttpHeaders getHeaders() {
|
||||
return response.getHeaders();
|
||||
}
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
throw new DataStoreOperationException("Cannot convert object of type " + origType + " to type " + requiredType);
|
||||
}
|
||||
}
|
||||
}
|
||||
return new RiakValue<T>((T) o, meta);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
protected <K, T> T checkCache(K key, Class<T> requiredType) {
|
||||
BucketKeyPair bucketKeyPair = resolveBucketKeyPair(key, requiredType);
|
||||
@@ -396,4 +442,30 @@ public abstract class AbstractRiakTemplate extends RestGatewaySupport implements
|
||||
return headers;
|
||||
}
|
||||
|
||||
protected <B, K> Class<?> getType(B bucket, K key) {
|
||||
HttpHeaders headers = getRestTemplate().headForHeaders(defaultUri, bucket, key);
|
||||
Class<?> clazz = null;
|
||||
if (null != headers) {
|
||||
String s = headers.getFirst(RIAK_META_CLASSNAME);
|
||||
if (null != s) {
|
||||
try {
|
||||
clazz = Class.forName(s);
|
||||
} catch (ClassNotFoundException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (null == clazz) {
|
||||
if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {
|
||||
clazz = Map.class;
|
||||
} else if (headers.getContentType().equals(MediaType.TEXT_PLAIN)) {
|
||||
clazz = String.class;
|
||||
} else {
|
||||
// handle as bytes
|
||||
log.error("Need to handle bytes!");
|
||||
clazz = byte[].class;
|
||||
}
|
||||
}
|
||||
return clazz;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -20,14 +20,17 @@ package org.springframework.data.keyvalue.riak.core;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.dao.DataAccessResourceFailureException;
|
||||
import org.springframework.data.keyvalue.riak.DataStoreOperationException;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.ClientHttpRequestFactory;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.client.ResourceAccessException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -89,8 +92,18 @@ public class AsyncRiakTemplate extends AbstractRiakTemplate implements AsyncBuck
|
||||
Assert.notNull(key, "Cannot use a <NULL> key.");
|
||||
String keyName = (null != qosParams ? key.toString() + extractQosParameters(qosParams) : key
|
||||
.toString());
|
||||
|
||||
KeyValueStoreMetaData origMeta = getMetaData(bucket, keyName);
|
||||
String vclock = null;
|
||||
if (null != origMeta) {
|
||||
vclock = origMeta.getProperties().get(RIAK_VCLOCK).toString();
|
||||
}
|
||||
|
||||
HttpHeaders headers = defaultHeaders(metaData);
|
||||
headers.setContentType(extractMediaType(value));
|
||||
if (null != vclock) {
|
||||
headers.set(RIAK_VCLOCK, vclock);
|
||||
}
|
||||
headers.set(RIAK_META_CLASSNAME, value.getClass().getName());
|
||||
HttpEntity<V> entity = new HttpEntity<V>(value, headers);
|
||||
return (Future<V>) workerPool.submit(new AsyncPost<V>(bucketName,
|
||||
@@ -103,17 +116,26 @@ public class AsyncRiakTemplate extends AbstractRiakTemplate implements AsyncBuck
|
||||
return getWithMetaData(bucket, key, null, callback);
|
||||
}
|
||||
|
||||
public <B, K> RiakMetaData getMetaData(B bucket, K key) {
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers;
|
||||
try {
|
||||
headers = restTemplate.headForHeaders(defaultUri, bucket, key);
|
||||
return extractMetaData(headers);
|
||||
} catch (ResourceAccessException e) {
|
||||
} catch (IOException e) {
|
||||
throw new DataAccessResourceFailureException(e.getMessage(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
public <B, K, T> Future<?> getWithMetaData(B bucket, K key, Class<T> requiredType, AsyncKeyValueStoreOperation<T> callback) {
|
||||
String bucketName = (null != bucket ? bucket.toString() : requiredType.getName());
|
||||
// Get a key name that may or may not include the QOS parameters.
|
||||
Assert.notNull(key, "Cannot use a <NULL> key.");
|
||||
if (null == requiredType) {
|
||||
try {
|
||||
requiredType = (Class<T>) getType(bucketName, key.toString());
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new DataStoreOperationException(e.getMessage(), e);
|
||||
}
|
||||
requiredType = (Class<T>) getType(bucketName, key.toString());
|
||||
}
|
||||
return workerPool.submit(new AsyncGet<T>(bucketName,
|
||||
key.toString(),
|
||||
@@ -229,32 +251,6 @@ public class AsyncRiakTemplate extends AbstractRiakTemplate implements AsyncBuck
|
||||
return setWithMetaData(bucket, key, value, metaData, null, callback);
|
||||
}
|
||||
|
||||
protected Class<?> getType(String bucket, String key) throws ClassNotFoundException {
|
||||
HttpHeaders headers = getRestTemplate().headForHeaders(defaultUri, bucket, key);
|
||||
Class<?> clazz = null;
|
||||
if (null != headers) {
|
||||
String s = headers.getFirst(RIAK_META_CLASSNAME);
|
||||
if (null != s) {
|
||||
try {
|
||||
clazz = Class.forName(s);
|
||||
} catch (ClassNotFoundException ignored) {
|
||||
if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {
|
||||
clazz = Map.class;
|
||||
} else if (headers.getContentType().equals(MediaType.TEXT_PLAIN)) {
|
||||
clazz = String.class;
|
||||
} else {
|
||||
// handle as bytes
|
||||
log.error("Need to handle bytes!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (null == clazz) {
|
||||
clazz = byte[].class;
|
||||
}
|
||||
return clazz;
|
||||
}
|
||||
|
||||
protected class AsyncPost<V> implements Runnable {
|
||||
|
||||
private String bucket;
|
||||
|
||||
@@ -113,41 +113,33 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
}
|
||||
|
||||
public <B, K> BucketKeyValueStoreOperations setAsBytes(B bucket, K key, byte[] value, QosParameters qosParams) {
|
||||
Assert.notNull(key, "Key cannot be null!");
|
||||
// If I don't give a bucket name, since I don't have an object type, use 'bytes'
|
||||
String bucketName = (null != bucket ? bucket.toString() : "bytes");
|
||||
// Get a key name that may or may not include the QOS parameters.
|
||||
String keyName = (null != qosParams ? key.toString() + extractQosParameters(qosParams) : key
|
||||
.toString());
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.set("X-Riak-ClientId", RIAK_CLIENT_ID);
|
||||
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
|
||||
HttpEntity<byte[]> entity = new HttpEntity<byte[]>(value, headers);
|
||||
try {
|
||||
restTemplate.put(defaultUri, entity, bucketName, keyName);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(String.format("PUT byte[]: bucket=%s, key=%s", bucketName, keyName));
|
||||
}
|
||||
} catch (RestClientException e) {
|
||||
throw new DataStoreOperationException(e.getMessage(), e);
|
||||
}
|
||||
return this;
|
||||
return setWithMetaData(bucket, key, value, null, qosParams);
|
||||
}
|
||||
|
||||
public <B, K, V> BucketKeyValueStoreOperations setWithMetaData(B bucket, K key, V value, Map<String, String> metaData, QosParameters qosParams) {
|
||||
Assert.notNull(key, "Key cannot be null!");
|
||||
// Get a key name that may or may not include the QOS parameters.
|
||||
String keyName = (null != qosParams ? key.toString() + extractQosParameters(qosParams) : key
|
||||
.toString());
|
||||
|
||||
KeyValueStoreMetaData origMeta = getMetaData(bucket, keyName);
|
||||
String vclock = null;
|
||||
if (null != origMeta) {
|
||||
vclock = origMeta.getProperties().get(RIAK_VCLOCK).toString();
|
||||
}
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.set("X-Riak-ClientId", RIAK_CLIENT_ID);
|
||||
headers.setContentType(extractMediaType(value));
|
||||
if (null != vclock) {
|
||||
headers.set(RIAK_VCLOCK, vclock);
|
||||
}
|
||||
if (null != metaData) {
|
||||
for (Map.Entry<String, String> entry : metaData.entrySet()) {
|
||||
headers.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
headers.set(RIAK_META_CLASSNAME, value.getClass().getName());
|
||||
HttpEntity<V> entity = new HttpEntity<V>(value, headers);
|
||||
try {
|
||||
restTemplate.put(defaultUri, entity, bucket, keyName);
|
||||
@@ -186,6 +178,7 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
* @return The generated ID
|
||||
*/
|
||||
public <B, V> String put(B bucket, V value, Map<String, String> metaData) {
|
||||
Assert.notNull(bucket, "Bucket cannot be null.");
|
||||
String bucketName = bucket.toString();
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
@@ -196,6 +189,7 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
headers.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
headers.set(RIAK_META_CLASSNAME, value.getClass().getName());
|
||||
HttpEntity<V> entity = new HttpEntity<V>(value, headers);
|
||||
try {
|
||||
URI uri = restTemplate.postForLocation(defaultUri, entity, bucketName, "");
|
||||
@@ -214,7 +208,7 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
|
||||
public <B, K> RiakMetaData getMetaData(B bucket, K key) {
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers = null;
|
||||
HttpHeaders headers;
|
||||
try {
|
||||
headers = restTemplate.headForHeaders(defaultUri, bucket, key);
|
||||
return extractMetaData(headers);
|
||||
@@ -225,6 +219,7 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
public <B, K, T> RiakValue<T> getWithMetaData(B bucket, K key, Class<T> requiredType) {
|
||||
// If no bucket name is given, infer it from the type name.
|
||||
String bucketName = (null != bucket ? bucket.toString() : requiredType.getName());
|
||||
@@ -235,44 +230,74 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
key,
|
||||
requiredType.getName()));
|
||||
}
|
||||
|
||||
Class<?> origType = getType(bucket, key);
|
||||
RiakValue<T> val = null;
|
||||
try {
|
||||
ResponseEntity<T> result = restTemplate.getForEntity(defaultUri,
|
||||
ResponseEntity<?> result = restTemplate.getForEntity(defaultUri,
|
||||
requiredType,
|
||||
bucketName,
|
||||
key);
|
||||
if (result.hasBody()) {
|
||||
RiakMetaData meta = extractMetaData(result.getHeaders());
|
||||
RiakValue<T> val = new RiakValue<T>(result.getBody(), meta);
|
||||
if (useCache) {
|
||||
cache.put(new SimpleBucketKeyPair<Object, Object>(bucket, key), val);
|
||||
}
|
||||
return val;
|
||||
}
|
||||
val = extractValue(result, requiredType, requiredType);
|
||||
} catch (HttpClientErrorException e) {
|
||||
switch (e.getStatusCode()) {
|
||||
case NOT_ACCEPTABLE:
|
||||
// Can't convert using HttpMessageConverter. Try fetching as the original type
|
||||
// and using the conversion service to convert.
|
||||
ResponseEntity<?> result = restTemplate.getForEntity(defaultUri,
|
||||
origType,
|
||||
bucketName,
|
||||
key);
|
||||
try {
|
||||
val = extractValue(result, origType, requiredType);
|
||||
} catch (IOException ioe) {
|
||||
throw new DataStoreOperationException(ioe.getMessage(), ioe);
|
||||
}
|
||||
case NOT_FOUND:
|
||||
// IGNORED
|
||||
break;
|
||||
default:
|
||||
throw new DataStoreOperationException(e.getMessage(), e);
|
||||
}
|
||||
if (e.getStatusCode() != HttpStatus.NOT_FOUND) {
|
||||
throw new DataStoreOperationException(e.getMessage(), e);
|
||||
}
|
||||
} catch (RestClientException rce) {
|
||||
// IGNORE
|
||||
if (rce.getMessage().contains("HTTP response code: 406")) {
|
||||
// Can't convert using HttpMessageConverter. Try fetching as the original type
|
||||
// and using the conversion service to convert.
|
||||
ResponseEntity<?> result = restTemplate.getForEntity(defaultUri,
|
||||
origType,
|
||||
bucketName,
|
||||
key);
|
||||
try {
|
||||
val = extractValue(result, origType, requiredType);
|
||||
} catch (IOException ioe) {
|
||||
throw new DataStoreOperationException(rce.getMessage(), rce);
|
||||
}
|
||||
} else {
|
||||
// IGNORE
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("RestClientException: " + rce.getMessage());
|
||||
}
|
||||
}
|
||||
} catch (EOFException eof) {
|
||||
// IGNORE
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("EOFException: " + eof.getMessage(), eof);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
return null;
|
||||
|
||||
if (null != val && useCache) {
|
||||
cache.put(new SimpleBucketKeyPair<Object, Object>(bucket, key), val);
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
public <B, K, T> T get(B bucket, K key) {
|
||||
Class targetClass;
|
||||
try {
|
||||
// Since no type is specified, first try using the bucket name as the target class...
|
||||
targetClass = Class.forName(bucket.toString());
|
||||
} catch (Throwable ignored) {
|
||||
// ...if that doesn't work, just use a Map, which we know will work.
|
||||
targetClass = Map.class;
|
||||
}
|
||||
Class targetClass = getType(bucket, key);
|
||||
RiakValue<T> obj = getWithMetaData(bucket, key, targetClass);
|
||||
return (null != obj ? obj.get() : null);
|
||||
}
|
||||
@@ -537,6 +562,7 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
}
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.setContentType(fromObj.getMetaData().getContentType());
|
||||
headers.set(RIAK_VCLOCK, fromObj.getMetaData().getProperties().get(RIAK_VCLOCK).toString());
|
||||
Object linksObj = fromObj.getMetaData().getProperties().get("Link");
|
||||
List<String> links = new ArrayList<String>();
|
||||
// First add all existing links...
|
||||
@@ -630,6 +656,8 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
final BodyPart partBody = part.getBodyPart(j);
|
||||
String partType = partBody.getContentType();
|
||||
String link = partBody.getHeader("Link")[0];
|
||||
String location = partBody.getHeader("Location")[0];
|
||||
String key = location.substring(location.lastIndexOf("/") + 1);
|
||||
String[] links = StringUtils.delimitedListToStringArray(link, ",");
|
||||
String bucketName = null;
|
||||
for (String s : links) {
|
||||
@@ -642,16 +670,8 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
}
|
||||
}
|
||||
Class<?> clazz = requiredType;
|
||||
if (null == clazz && null != bucketName) {
|
||||
try {
|
||||
clazz = Class.forName(bucketName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
// Default to a Map. We know that will work.
|
||||
clazz = Map.class;
|
||||
}
|
||||
} else {
|
||||
// Default to a Map. We know that will work.
|
||||
clazz = Map.class;
|
||||
if (null == clazz) {
|
||||
clazz = getType(bucketName, key);
|
||||
}
|
||||
|
||||
// Can convert message?
|
||||
@@ -676,7 +696,9 @@ public class RiakTemplate extends AbstractRiakTemplate implements BucketKeyValue
|
||||
}
|
||||
}
|
||||
|
||||
log.debug(String.format("results=%s", results));
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(String.format("results=%s", results));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import groovy.lang.Closure;
|
||||
import groovy.util.BuilderSupport;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.keyvalue.riak.core.AsyncRiakTemplate;
|
||||
import org.springframework.data.keyvalue.riak.core.RiakQosParameters;
|
||||
|
||||
@@ -35,7 +36,9 @@ import java.util.concurrent.Executors;
|
||||
public class RiakBuilder extends BuilderSupport {
|
||||
|
||||
protected final Logger log = LoggerFactory.getLogger(getClass());
|
||||
@Autowired
|
||||
protected AsyncRiakTemplate riak;
|
||||
@Autowired
|
||||
protected ExecutorService workerPool = Executors.newCachedThreadPool();
|
||||
|
||||
public RiakBuilder(AsyncRiakTemplate riak) {
|
||||
@@ -112,8 +115,17 @@ public class RiakBuilder extends BuilderSupport {
|
||||
}
|
||||
|
||||
o = attributes.get("wait");
|
||||
if (null != o && o instanceof Long) {
|
||||
op.setTimeout((Long) o);
|
||||
if (null != o) {
|
||||
if (o instanceof Long) {
|
||||
op.setTimeout((Long) o);
|
||||
} else if (o instanceof String) {
|
||||
op.setTimeout(new Long(o.toString()));
|
||||
} else if (o instanceof Integer) {
|
||||
op.setTimeout(new Long((Integer) o));
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Timeout should be an Integer, a Long, or a String denoting milliseconds");
|
||||
}
|
||||
}
|
||||
return op;
|
||||
}
|
||||
|
||||
@@ -149,7 +149,21 @@ public class RiakOperation<T> implements Callable {
|
||||
f = riak.delete(bucket, key, callbackInvoker);
|
||||
break;
|
||||
}
|
||||
return null != f && timeout > 0 ? (T) f.get(timeout, TimeUnit.MILLISECONDS) : null;
|
||||
|
||||
if (null != f) {
|
||||
if (timeout == 0) {
|
||||
// Don't wait at all
|
||||
return (T) f;
|
||||
} else if (timeout > 0) {
|
||||
// Block until finished or timeout
|
||||
return (T) f.get(timeout, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
// Block indefinitely
|
||||
return (T) f.get();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
class GuardedClosure {
|
||||
|
||||
@@ -43,7 +43,7 @@ class RiakBuilderSpec extends Specification {
|
||||
def result = null
|
||||
|
||||
when:
|
||||
riak.set(bucket: "test", key: "test", qos: [dw: "all"], value: obj, wait: 3000L) {
|
||||
riak.set(bucket: "test", key: "test", qos: [dw: "all"], value: obj) {
|
||||
|
||||
completed(when: { v -> v.integer == 12 }) { v, meta ->
|
||||
result = v.test
|
||||
@@ -66,7 +66,7 @@ class RiakBuilderSpec extends Specification {
|
||||
def result = null
|
||||
|
||||
when:
|
||||
riak.get(bucket: "test", key: "test", wait: 3000L) {
|
||||
riak.get(bucket: "test", key: "test") {
|
||||
|
||||
completed(when: { v -> v.integer == 12 }) { v, meta ->
|
||||
result = v.test
|
||||
@@ -90,7 +90,7 @@ class RiakBuilderSpec extends Specification {
|
||||
def result = null
|
||||
|
||||
when:
|
||||
riak.setAsBytes(bucket: "test", key: "test", value: obj, qos: [dw: "all"], wait: 3000L) {
|
||||
riak.setAsBytes(bucket: "test", key: "test", value: obj, qos: [dw: "all"]) {
|
||||
completed { v -> result = "success" }
|
||||
failed { e -> result = "failure" }
|
||||
}
|
||||
|
||||
@@ -35,24 +35,39 @@ class RiakTemplateSpec extends Specification {
|
||||
|
||||
@Autowired
|
||||
ApplicationContext appCtx
|
||||
@Autowired
|
||||
RiakTemplate riak
|
||||
@Shared RiakTemplate riak = new RiakTemplate()
|
||||
int run = 1
|
||||
@Shared def riakBin = System.properties["bamboo.RIAK_BIN"] ?: "/usr/sbin/riak"
|
||||
@Shared def p
|
||||
@Shared def id
|
||||
/*
|
||||
@Shared boolean shutdown = false
|
||||
|
||||
def setupSpec() {
|
||||
p = "$riakBin start".execute()
|
||||
p.waitFor()
|
||||
Thread.sleep(2000)
|
||||
RiakQosParameters qos = new RiakQosParameters()
|
||||
qos.setDurableWriteThreshold("all")
|
||||
riak.setDefaultQosParameters(qos)
|
||||
|
||||
if (!riak.get("status", "")) {
|
||||
p = "$riakBin start".execute()
|
||||
p.waitFor()
|
||||
shutdown = true
|
||||
Thread.sleep(2000)
|
||||
}
|
||||
|
||||
riak.getBucketSchema("test", true).keys.each {
|
||||
riak.delete("test", it)
|
||||
}
|
||||
riak.getBucketSchema(TestObject.name, true).keys.each {
|
||||
riak.delete("test", it)
|
||||
}
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
p = "$riakBin stop".execute()
|
||||
p.waitFor()
|
||||
if (shutdown) {
|
||||
p = "$riakBin stop".execute()
|
||||
p.waitFor()
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
def "Test Map object"() {
|
||||
|
||||
@@ -90,13 +105,30 @@ class RiakTemplateSpec extends Specification {
|
||||
riak.set(TestObject.name, "test", objIn)
|
||||
|
||||
when:
|
||||
TestObject objOut = riak.get(TestObject.name, "test")
|
||||
TestObject objOut = riak.getAsType(TestObject.name, "test", TestObject)
|
||||
|
||||
then:
|
||||
objOut.test == "value"
|
||||
|
||||
}
|
||||
|
||||
def "Test convert custom object from bytes"() {
|
||||
|
||||
given:
|
||||
def qos = new RiakQosParameters()
|
||||
qos.durableWriteThreshold = "all"
|
||||
riak.setAsBytes(TestObject.name, "test", "{\"test\":\"string data\",\"integer\":1}".bytes, qos)
|
||||
|
||||
when:
|
||||
def objOut = riak.getAsType(TestObject.name, "test", TestObject)
|
||||
//riak.delete(TestObject.name, "test")
|
||||
|
||||
then:
|
||||
objOut instanceof TestObject
|
||||
objOut.test == "string data"
|
||||
|
||||
}
|
||||
|
||||
def "Test getting bucket schema"() {
|
||||
|
||||
when:
|
||||
@@ -155,6 +187,9 @@ class RiakTemplateSpec extends Specification {
|
||||
def "Test linking"() {
|
||||
|
||||
given:
|
||||
def qos = new RiakQosParameters()
|
||||
qos.durableWriteThreshold = "all"
|
||||
riak.set(TestObject.name, "test", new TestObject(), qos)
|
||||
riak.link(TestObject.name, "test", "test", "test", "test")
|
||||
|
||||
when:
|
||||
@@ -194,7 +229,7 @@ class RiakTemplateSpec extends Specification {
|
||||
|
||||
given:
|
||||
def i = run++
|
||||
def newObj = [test: "value $i", integer: 12]
|
||||
def newObj = [test: "value $i".toString(), integer: 12]
|
||||
|
||||
when:
|
||||
def oldObj = riak.getAndSet("test", "test", newObj)
|
||||
@@ -224,7 +259,7 @@ class RiakTemplateSpec extends Specification {
|
||||
def result = riak.execute(job, Integer)
|
||||
|
||||
then:
|
||||
1 == result
|
||||
2 == result
|
||||
|
||||
}
|
||||
|
||||
@@ -248,7 +283,7 @@ class RiakTemplateSpec extends Specification {
|
||||
|
||||
then:
|
||||
1 == result.size()
|
||||
1 == result[0]
|
||||
2 == result[0]
|
||||
|
||||
}
|
||||
|
||||
@@ -273,13 +308,8 @@ class RiakTemplateSpec extends Specification {
|
||||
|
||||
def "Test delete key"() {
|
||||
|
||||
given:
|
||||
def testKey = new SimpleBucketKeyPair("test", "test")
|
||||
def testKey2 = new SimpleBucketKeyPair(TestObject.name, "test")
|
||||
def testKey3 = new SimpleBucketKeyPair("test", id)
|
||||
|
||||
when:
|
||||
def deleted = riak.deleteKeys(testKey, testKey2, testKey3)
|
||||
def deleted = riak.deleteKeys("test:test", "${TestObject.name}:test", "test:$id")
|
||||
|
||||
then:
|
||||
true == deleted
|
||||
|
||||
Reference in New Issue
Block a user