DATACASS-116 - Completed

Enhanced executeAsychronously() API.
This commit is contained in:
David T Webb
2014-03-27 15:34:49 -04:00
parent 9c2cf17c03
commit 3ab62a1dda
2 changed files with 200 additions and 8 deletions

View File

@@ -120,10 +120,61 @@ public interface CqlOperations {
/**
* Executes the supplied Query Asynchronously and returns nothing.
*
* @param cql The {@link Query} to execute
* @param cql The CQL String to execute
*/
void executeAsynchronously(String cql) throws DataAccessException;
/**
* Executes the supplied Query Asynchronously and returns nothing.
*
* @param cql The CQL String to execute
* @param options The {@link QueryOptions} to use. Only applies to cql statements that can use QueryOptions.
*/
void executeAsynchronously(String cql, QueryOptions options) throws DataAccessException;
/**
* Executes the supplied Query Asynchronously and returns nothing.
*
* @param cql The CQL String to execute
* @param listener The {@link Runnable} to register with the {@link ResultSetFuture}
*
* @see queryAsyncronously for Reads
*/
void executeAsynchronously(String cql, Runnable listener) throws DataAccessException;
/**
* Executes the supplied Query Asynchronously and returns nothing.
*
* @param cql The CQL String to execute
* @param listener The {@link Runnable} to register with the {@link ResultSetFuture}
* @param executor The {@link Executor} to regsiter with the {@link ResultSetFuture}
*
* @see queryAsyncronously for Reads
*/
void executeAsynchronously(String cql, Runnable listener, Executor executor) throws DataAccessException;
/**
* Executes the supplied Query Asynchronously and returns nothing.
*
* @param cql The CQL String to execute
* @param listener The {@link AsynchronousQueryListener} to register with the {@link ResultSetFuture}
*
* @see queryAsyncronously for Reads
*/
void executeAsynchronously(String cql, AsynchronousQueryListener listener) throws DataAccessException;
/**
* Executes the supplied Query Asynchronously and returns nothing.
*
* @param cql The CQL String to execute
* @param listener The {@link AsynchronousQueryListener} to register with the {@link ResultSetFuture}
* @param executor The {@link Executor} to regsiter with the {@link ResultSetFuture}
*
* @see queryAsyncronously for Reads
*/
void executeAsynchronously(String cql, AsynchronousQueryListener listener, Executor executor)
throws DataAccessException;
/**
* Executes the supplied CQL Truncate Asynchronously and returns nothing.
*
@@ -131,13 +182,6 @@ public interface CqlOperations {
*/
void executeAsynchronously(Truncate truncate) throws DataAccessException;
/**
* Executes the supplied Query Asynchronously and returns nothing.
*
* @param cql The {@link Query} to execute
*/
void executeAsynchronously(String cql, QueryOptions options) throws DataAccessException;
/**
* Executes the supplied CQL Delete Asynchronously and returns nothing.
*
@@ -173,6 +217,35 @@ public interface CqlOperations {
*/
void executeAsynchronously(Query query) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
*/
void executeAsynchronously(Query query, Runnable runnable) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
*/
void executeAsynchronously(Query query, AsynchronousQueryListener listener) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
*/
void executeAsynchronously(Query query, Runnable runnable, Executor executor) throws DataAccessException;
/**
* Executes the supplied CQL Query Asynchronously and returns nothing.
*
* @param query The {@link Query} to execute
*/
void executeAsynchronously(Query query, AsynchronousQueryListener listener, Executor executor)
throws DataAccessException;
/**
* Executes the provided CQL Query, and extracts the results with the ResultSetExtractor. This uses default Query
* Options when extracting the ResultSet.

View File

@@ -605,11 +605,130 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations {
doExecuteAsync(addQueryOptions(new SimpleStatement(cql), options));
}
@Override
public void executeAsynchronously(String cql, Runnable listener) throws DataAccessException {
executeAsynchronously(cql, listener, new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
});
}
@Override
public void executeAsynchronously(final String cql, final Runnable listener, final Executor executor)
throws DataAccessException {
execute(new SessionCallback<Object>() {
@Override
public Object doInSession(Session s) throws DataAccessException {
Statement statement = new SimpleStatement(cql);
final ResultSetFuture rsf = s.executeAsync(statement);
rsf.addListener(listener, executor);
return null;
}
});
}
@Override
public void executeAsynchronously(String cql, AsynchronousQueryListener listener) throws DataAccessException {
executeAsynchronously(cql, listener, new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
});
}
@Override
public void executeAsynchronously(final String cql, final AsynchronousQueryListener listener, final Executor executor)
throws DataAccessException {
execute(new SessionCallback<Object>() {
@Override
public Object doInSession(Session s) throws DataAccessException {
Statement statement = new SimpleStatement(cql);
final ResultSetFuture rsf = s.executeAsync(statement);
Runnable wrapper = new Runnable() {
@Override
public void run() {
listener.onQueryComplete(rsf);
}
};
rsf.addListener(wrapper, executor);
return null;
}
});
}
@Override
public void executeAsynchronously(Query query) throws DataAccessException {
doExecuteAsync(query);
}
@Override
public void executeAsynchronously(Query query, Runnable listener) throws DataAccessException {
executeAsynchronously(query, listener, new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
});
}
@Override
public void executeAsynchronously(Query query, AsynchronousQueryListener listener) throws DataAccessException {
executeAsynchronously(query, listener, new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
});
}
@Override
public void executeAsynchronously(final Query query, final Runnable listener, final Executor executor)
throws DataAccessException {
execute(new SessionCallback<Object>() {
@Override
public Object doInSession(Session s) throws DataAccessException {
final ResultSetFuture rsf = s.executeAsync(query);
rsf.addListener(listener, executor);
return null;
}
});
}
@Override
public void executeAsynchronously(final Query query, final AsynchronousQueryListener listener, final Executor executor)
throws DataAccessException {
execute(new SessionCallback<Object>() {
@Override
public Object doInSession(Session s) throws DataAccessException {
final ResultSetFuture rsf = s.executeAsync(query);
Runnable wrapper = new Runnable() {
@Override
public void run() {
listener.onQueryComplete(rsf);
}
};
rsf.addListener(wrapper, executor);
return null;
}
});
}
@Override
public void process(ResultSet resultSet, RowCallbackHandler rch) throws DataAccessException {
try {