From 3ab62a1ddabe9ebe35cfab001dd5119bbe00372a Mon Sep 17 00:00:00 2001 From: David T Webb Date: Thu, 27 Mar 2014 15:34:49 -0400 Subject: [PATCH] DATACASS-116 - Completed Enhanced executeAsychronously() API. --- .../cassandra/core/CqlOperations.java | 89 +++++++++++-- .../cassandra/core/CqlTemplate.java | 119 ++++++++++++++++++ 2 files changed, 200 insertions(+), 8 deletions(-) diff --git a/spring-cql/src/main/java/org/springframework/cassandra/core/CqlOperations.java b/spring-cql/src/main/java/org/springframework/cassandra/core/CqlOperations.java index 1bfb8e7e4..a8e51ad28 100644 --- a/spring-cql/src/main/java/org/springframework/cassandra/core/CqlOperations.java +++ b/spring-cql/src/main/java/org/springframework/cassandra/core/CqlOperations.java @@ -120,10 +120,61 @@ public interface CqlOperations { /** * Executes the supplied Query Asynchronously and returns nothing. * - * @param cql The {@link Query} to execute + * @param cql The CQL String to execute */ void executeAsynchronously(String cql) throws DataAccessException; + /** + * Executes the supplied Query Asynchronously and returns nothing. + * + * @param cql The CQL String to execute + * @param options The {@link QueryOptions} to use. Only applies to cql statements that can use QueryOptions. + */ + void executeAsynchronously(String cql, QueryOptions options) throws DataAccessException; + + /** + * Executes the supplied Query Asynchronously and returns nothing. + * + * @param cql The CQL String to execute + * @param listener The {@link Runnable} to register with the {@link ResultSetFuture} + * + * @see queryAsyncronously for Reads + */ + void executeAsynchronously(String cql, Runnable listener) throws DataAccessException; + + /** + * Executes the supplied Query Asynchronously and returns nothing. + * + * @param cql The CQL String to execute + * @param listener The {@link Runnable} to register with the {@link ResultSetFuture} + * @param executor The {@link Executor} to regsiter with the {@link ResultSetFuture} + * + * @see queryAsyncronously for Reads + */ + void executeAsynchronously(String cql, Runnable listener, Executor executor) throws DataAccessException; + + /** + * Executes the supplied Query Asynchronously and returns nothing. + * + * @param cql The CQL String to execute + * @param listener The {@link AsynchronousQueryListener} to register with the {@link ResultSetFuture} + * + * @see queryAsyncronously for Reads + */ + void executeAsynchronously(String cql, AsynchronousQueryListener listener) throws DataAccessException; + + /** + * Executes the supplied Query Asynchronously and returns nothing. + * + * @param cql The CQL String to execute + * @param listener The {@link AsynchronousQueryListener} to register with the {@link ResultSetFuture} + * @param executor The {@link Executor} to regsiter with the {@link ResultSetFuture} + * + * @see queryAsyncronously for Reads + */ + void executeAsynchronously(String cql, AsynchronousQueryListener listener, Executor executor) + throws DataAccessException; + /** * Executes the supplied CQL Truncate Asynchronously and returns nothing. * @@ -131,13 +182,6 @@ public interface CqlOperations { */ void executeAsynchronously(Truncate truncate) throws DataAccessException; - /** - * Executes the supplied Query Asynchronously and returns nothing. - * - * @param cql The {@link Query} to execute - */ - void executeAsynchronously(String cql, QueryOptions options) throws DataAccessException; - /** * Executes the supplied CQL Delete Asynchronously and returns nothing. * @@ -173,6 +217,35 @@ public interface CqlOperations { */ void executeAsynchronously(Query query) throws DataAccessException; + /** + * Executes the supplied CQL Query Asynchronously and returns nothing. + * + * @param query The {@link Query} to execute + */ + void executeAsynchronously(Query query, Runnable runnable) throws DataAccessException; + + /** + * Executes the supplied CQL Query Asynchronously and returns nothing. + * + * @param query The {@link Query} to execute + */ + void executeAsynchronously(Query query, AsynchronousQueryListener listener) throws DataAccessException; + + /** + * Executes the supplied CQL Query Asynchronously and returns nothing. + * + * @param query The {@link Query} to execute + */ + void executeAsynchronously(Query query, Runnable runnable, Executor executor) throws DataAccessException; + + /** + * Executes the supplied CQL Query Asynchronously and returns nothing. + * + * @param query The {@link Query} to execute + */ + void executeAsynchronously(Query query, AsynchronousQueryListener listener, Executor executor) + throws DataAccessException; + /** * Executes the provided CQL Query, and extracts the results with the ResultSetExtractor. This uses default Query * Options when extracting the ResultSet. diff --git a/spring-cql/src/main/java/org/springframework/cassandra/core/CqlTemplate.java b/spring-cql/src/main/java/org/springframework/cassandra/core/CqlTemplate.java index 267247a70..52634405a 100644 --- a/spring-cql/src/main/java/org/springframework/cassandra/core/CqlTemplate.java +++ b/spring-cql/src/main/java/org/springframework/cassandra/core/CqlTemplate.java @@ -605,11 +605,130 @@ public class CqlTemplate extends CassandraAccessor implements CqlOperations { doExecuteAsync(addQueryOptions(new SimpleStatement(cql), options)); } + @Override + public void executeAsynchronously(String cql, Runnable listener) throws DataAccessException { + executeAsynchronously(cql, listener, new Executor() { + + @Override + public void execute(Runnable command) { + command.run(); + } + + }); + } + + @Override + public void executeAsynchronously(final String cql, final Runnable listener, final Executor executor) + throws DataAccessException { + + execute(new SessionCallback() { + @Override + public Object doInSession(Session s) throws DataAccessException { + Statement statement = new SimpleStatement(cql); + final ResultSetFuture rsf = s.executeAsync(statement); + rsf.addListener(listener, executor); + return null; + } + }); + + } + + @Override + public void executeAsynchronously(String cql, AsynchronousQueryListener listener) throws DataAccessException { + executeAsynchronously(cql, listener, new Executor() { + + @Override + public void execute(Runnable command) { + command.run(); + } + + }); + + } + + @Override + public void executeAsynchronously(final String cql, final AsynchronousQueryListener listener, final Executor executor) + throws DataAccessException { + + execute(new SessionCallback() { + @Override + public Object doInSession(Session s) throws DataAccessException { + Statement statement = new SimpleStatement(cql); + final ResultSetFuture rsf = s.executeAsync(statement); + Runnable wrapper = new Runnable() { + @Override + public void run() { + listener.onQueryComplete(rsf); + } + }; + rsf.addListener(wrapper, executor); + return null; + } + }); + + } + @Override public void executeAsynchronously(Query query) throws DataAccessException { doExecuteAsync(query); } + @Override + public void executeAsynchronously(Query query, Runnable listener) throws DataAccessException { + executeAsynchronously(query, listener, new Executor() { + + @Override + public void execute(Runnable command) { + command.run(); + } + }); + } + + @Override + public void executeAsynchronously(Query query, AsynchronousQueryListener listener) throws DataAccessException { + executeAsynchronously(query, listener, new Executor() { + + @Override + public void execute(Runnable command) { + command.run(); + } + }); + } + + @Override + public void executeAsynchronously(final Query query, final Runnable listener, final Executor executor) + throws DataAccessException { + execute(new SessionCallback() { + @Override + public Object doInSession(Session s) throws DataAccessException { + final ResultSetFuture rsf = s.executeAsync(query); + rsf.addListener(listener, executor); + return null; + } + }); + } + + @Override + public void executeAsynchronously(final Query query, final AsynchronousQueryListener listener, final Executor executor) + throws DataAccessException { + + execute(new SessionCallback() { + @Override + public Object doInSession(Session s) throws DataAccessException { + final ResultSetFuture rsf = s.executeAsync(query); + Runnable wrapper = new Runnable() { + @Override + public void run() { + listener.onQueryComplete(rsf); + } + }; + rsf.addListener(wrapper, executor); + return null; + } + }); + + } + @Override public void process(ResultSet resultSet, RowCallbackHandler rch) throws DataAccessException { try {