Merge pull request #571 from sdeleuze/SPR-11820

Make ListenableFuture compliant with Java 8 lambda
This commit is contained in:
Juergen Hoeller
2014-07-16 16:41:41 +02:00
14 changed files with 392 additions and 41 deletions

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
/**
* Defines the contract for failure callbacks that accept the result of a
* {@link ListenableFuture}.
*
* @author Sebastien Deleuze
* @since 4.1
*/
public interface FailureCallback {
/**
* Called when the {@link ListenableFuture} fails to complete.
* @param t the exception that triggered the failure
*/
void onFailure(Throwable t);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
* <p>Inspired by {@code com.google.common.util.concurrent.ListenableFuture}.
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @since 4.0
*/
public interface ListenableFuture<T> extends Future<T> {
@@ -37,4 +38,15 @@ public interface ListenableFuture<T> extends Future<T> {
*/
void addCallback(ListenableFutureCallback<? super T> callback);
/**
* Registers the given success and failure callbacks to this {@code ListenableFuture}.
* The callback will be triggered when this {@code Future} is complete or, if it is
* already complete immediately. This is a Java 8 lambdas compliant alternative to
* {@link #addCallback(ListenableFutureCallback)}.
* @param successCallback the success callback to register
* @param failureCallback the failure callback to register
* @since 4.1
*/
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,12 +44,18 @@ public abstract class ListenableFutureAdapter<T, S> extends FutureAdapter<T, S>
@Override
public void addCallback(final ListenableFutureCallback<? super T> callback) {
addCallback(callback, callback);
}
@Override
public void addCallback(final SuccessCallback<? super T> successCallback,
final FailureCallback failureCallback) {
ListenableFuture<S> listenableAdaptee = (ListenableFuture<S>) getAdaptee();
listenableAdaptee.addCallback(new ListenableFutureCallback<S>() {
@Override
public void onSuccess(S result) {
try {
callback.onSuccess(adaptInternal(result));
successCallback.onSuccess(adaptInternal(result));
}
catch (ExecutionException ex) {
Throwable cause = ex.getCause();
@@ -62,8 +68,9 @@ public abstract class ListenableFutureAdapter<T, S> extends FutureAdapter<T, S>
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
failureCallback.onFailure(t);
}
});
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,20 +21,9 @@ package org.springframework.util.concurrent;
* {@link ListenableFuture}.
*
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @since 4.0
*/
public interface ListenableFutureCallback<T> {
/**
* Called when the {@link ListenableFuture} successfully completes.
* @param result the result
*/
void onSuccess(T result);
/**
* Called when the {@link ListenableFuture} fails to complete.
* @param t the exception that triggered the failure
*/
void onFailure(Throwable t);
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,12 +27,16 @@ import org.springframework.util.Assert;
* <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
*
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @since 4.0
*/
public class ListenableFutureCallbackRegistry<T> {
private final Queue<ListenableFutureCallback<? super T>> callbacks =
new LinkedList<ListenableFutureCallback<? super T>>();
private final Queue<SuccessCallback<? super T>> successCallbacks =
new LinkedList<SuccessCallback<? super T>>();
private final Queue<FailureCallback> failureCallbacks =
new LinkedList<FailureCallback>();
private State state = State.NEW;
@@ -52,7 +56,8 @@ public class ListenableFutureCallbackRegistry<T> {
synchronized (mutex) {
switch (state) {
case NEW:
callbacks.add(callback);
successCallbacks.add(callback);
failureCallbacks.add(callback);
break;
case SUCCESS:
callback.onSuccess((T)result);
@@ -64,6 +69,50 @@ public class ListenableFutureCallbackRegistry<T> {
}
}
/**
* Adds the given success callback to this registry.
* @param callback the success callback to add
*
* @since 4.1
*/
@SuppressWarnings("unchecked")
public void addSuccessCallback(SuccessCallback<? super T> callback) {
Assert.notNull(callback, "'callback' must not be null");
synchronized (mutex) {
switch (state) {
case NEW:
successCallbacks.add(callback);
break;
case SUCCESS:
callback.onSuccess((T)result);
break;
}
}
}
/**
* Adds the given failure callback to this registry.
* @param callback the failure callback to add
*
* @since 4.1
*/
@SuppressWarnings("unchecked")
public void addFailureCallback(FailureCallback callback) {
Assert.notNull(callback, "'callback' must not be null");
synchronized (mutex) {
switch (state) {
case NEW:
failureCallbacks.add(callback);
break;
case FAILURE:
callback.onFailure((Throwable) result);
break;
}
}
}
/**
* Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added
* callbacks with the given result
@@ -74,8 +123,8 @@ public class ListenableFutureCallbackRegistry<T> {
state = State.SUCCESS;
this.result = result;
while (!callbacks.isEmpty()) {
callbacks.poll().onSuccess(result);
while (!successCallbacks.isEmpty()) {
successCallbacks.poll().onSuccess(result);
}
}
}
@@ -90,8 +139,8 @@ public class ListenableFutureCallbackRegistry<T> {
state = State.FAILURE;
this.result = t;
while (!callbacks.isEmpty()) {
callbacks.poll().onFailure(t);
while (!failureCallbacks.isEmpty()) {
failureCallbacks.poll().onFailure(t);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -57,6 +57,12 @@ public class ListenableFutureTask<T> extends FutureTask<T> implements Listenable
this.callbacks.addCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.callbacks.addSuccessCallback(successCallback);
this.callbacks.addFailureCallback(failureCallback);
}
@Override
protected final void done() {
Throwable cause;

View File

@@ -84,6 +84,11 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
this.listenableFuture.addCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.listenableFuture.addCallback(successCallback, failureCallback);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
this.settableTask.setCancelled();

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
/**
* Defines the contract for success callbacks that accept the result of a
* {@link ListenableFuture}.
*
* @author Sebastien Deleuze
* @since 4.1
*/
public interface SuccessCallback<T> {
/**
* Called when the {@link ListenableFuture} successfully completes.
* @param result the result
*/
void onSuccess(T result);
}