SettableListenableFuture consistently tracks cancellation state
Issue: SPR-15202
This commit is contained in:
@@ -27,13 +27,14 @@ import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture}
|
||||
* whose value can be set via {@link #set(Object)} or
|
||||
* {@link #setException(Throwable)}. It may also be cancelled.
|
||||
* whose value can be set via {@link #set(T)} or {@link #setException(Throwable)}.
|
||||
* It may also be cancelled.
|
||||
*
|
||||
* <p>Inspired by {@code com.google.common.util.concurrent.SettableFuture}.
|
||||
*
|
||||
* @author Mattias Severson
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Juergen Hoeller
|
||||
* @since 4.1
|
||||
*/
|
||||
public class SettableListenableFuture<T> implements ListenableFuture<T> {
|
||||
@@ -92,8 +93,8 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
this.settableTask.setCancelled();
|
||||
boolean cancelled = this.listenableFuture.cancel(mayInterruptIfRunning);
|
||||
boolean cancelled = this.settableTask.setCancelled();
|
||||
this.listenableFuture.cancel(mayInterruptIfRunning);
|
||||
if (cancelled && mayInterruptIfRunning) {
|
||||
interruptTask();
|
||||
}
|
||||
@@ -102,12 +103,12 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return this.listenableFuture.isCancelled();
|
||||
return this.settableTask.isCancelled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return this.listenableFuture.isDone();
|
||||
return this.settableTask.isDone();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -152,26 +153,28 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
|
||||
|
||||
private static final Object NO_VALUE = new Object();
|
||||
|
||||
private static final Object CANCELLED = new Object();
|
||||
|
||||
private final AtomicReference<Object> value = new AtomicReference<>(NO_VALUE);
|
||||
|
||||
private volatile boolean cancelled = false;
|
||||
|
||||
public boolean setValue(T value) {
|
||||
if (this.cancelled) {
|
||||
return false;
|
||||
}
|
||||
return this.value.compareAndSet(NO_VALUE, value);
|
||||
}
|
||||
|
||||
public boolean setException(Throwable exception) {
|
||||
if (this.cancelled) {
|
||||
return false;
|
||||
}
|
||||
return this.value.compareAndSet(NO_VALUE, exception);
|
||||
}
|
||||
|
||||
public void setCancelled() {
|
||||
this.cancelled = true;
|
||||
public boolean setCancelled() {
|
||||
return this.value.compareAndSet(NO_VALUE, CANCELLED);
|
||||
}
|
||||
|
||||
public boolean isCancelled() {
|
||||
return (this.value.get() == CANCELLED);
|
||||
}
|
||||
|
||||
public boolean isDone() {
|
||||
return (this.value.get() != NO_VALUE);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* @author Mattias Severson
|
||||
* @author Juergen Hoeller
|
||||
*/
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
public class SettableListenableFutureTests {
|
||||
@@ -39,29 +40,31 @@ public class SettableListenableFutureTests {
|
||||
|
||||
@Test
|
||||
public void validateInitialValues() {
|
||||
assertFalse(settableListenableFuture.isDone());
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertFalse(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void returnsSetValue() throws ExecutionException, InterruptedException {
|
||||
String string = "hello";
|
||||
boolean wasSet = settableListenableFuture.set(string);
|
||||
assertTrue(wasSet);
|
||||
assertTrue(settableListenableFuture.set(string));
|
||||
assertThat(settableListenableFuture.get(), equalTo(string));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setValueUpdatesDoneStatus() {
|
||||
settableListenableFuture.set("hello");
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException {
|
||||
Throwable exception = new RuntimeException();
|
||||
boolean wasSet = settableListenableFuture.setException(exception);
|
||||
assertTrue(wasSet);
|
||||
assertTrue(settableListenableFuture.setException(exception));
|
||||
|
||||
try {
|
||||
settableListenableFuture.get();
|
||||
fail("Expected ExecutionException");
|
||||
@@ -69,13 +72,16 @@ public class SettableListenableFutureTests {
|
||||
catch (ExecutionException ex) {
|
||||
assertThat(ex.getCause(), equalTo(exception));
|
||||
}
|
||||
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void throwsSetErrorWrappedInExecutionException() throws ExecutionException, InterruptedException {
|
||||
Throwable exception = new OutOfMemoryError();
|
||||
boolean wasSet = settableListenableFuture.setException(exception);
|
||||
assertTrue(wasSet);
|
||||
assertTrue(settableListenableFuture.setException(exception));
|
||||
|
||||
try {
|
||||
settableListenableFuture.get();
|
||||
fail("Expected ExecutionException");
|
||||
@@ -83,12 +89,16 @@ public class SettableListenableFutureTests {
|
||||
catch (ExecutionException ex) {
|
||||
assertThat(ex.getCause(), equalTo(exception));
|
||||
}
|
||||
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setValueTriggersCallback() {
|
||||
String string = "hello";
|
||||
final String[] callbackHolder = new String[1];
|
||||
|
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
|
||||
@Override
|
||||
public void onSuccess(String result) {
|
||||
@@ -99,14 +109,18 @@ public class SettableListenableFutureTests {
|
||||
fail("Expected onSuccess() to be called");
|
||||
}
|
||||
});
|
||||
|
||||
settableListenableFuture.set(string);
|
||||
assertThat(callbackHolder[0], equalTo(string));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setValueTriggersCallbackOnlyOnce() {
|
||||
String string = "hello";
|
||||
final String[] callbackHolder = new String[1];
|
||||
|
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
|
||||
@Override
|
||||
public void onSuccess(String result) {
|
||||
@@ -117,15 +131,19 @@ public class SettableListenableFutureTests {
|
||||
fail("Expected onSuccess() to be called");
|
||||
}
|
||||
});
|
||||
|
||||
settableListenableFuture.set(string);
|
||||
assertFalse(settableListenableFuture.set("good bye"));
|
||||
assertThat(callbackHolder[0], equalTo(string));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setExceptionTriggersCallback() {
|
||||
Throwable exception = new RuntimeException();
|
||||
final Throwable[] callbackHolder = new Throwable[1];
|
||||
|
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
|
||||
@Override
|
||||
public void onSuccess(String result) {
|
||||
@@ -136,14 +154,18 @@ public class SettableListenableFutureTests {
|
||||
callbackHolder[0] = ex;
|
||||
}
|
||||
});
|
||||
|
||||
settableListenableFuture.setException(exception);
|
||||
assertThat(callbackHolder[0], equalTo(exception));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setExceptionTriggersCallbackOnlyOnce() {
|
||||
Throwable exception = new RuntimeException();
|
||||
final Throwable[] callbackHolder = new Throwable[1];
|
||||
|
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
|
||||
@Override
|
||||
public void onSuccess(String result) {
|
||||
@@ -154,20 +176,26 @@ public class SettableListenableFutureTests {
|
||||
callbackHolder[0] = ex;
|
||||
}
|
||||
});
|
||||
|
||||
settableListenableFuture.setException(exception);
|
||||
assertFalse(settableListenableFuture.setException(new IllegalArgumentException()));
|
||||
assertThat(callbackHolder[0], equalTo(exception));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nullIsAcceptedAsValueToSet() throws ExecutionException, InterruptedException {
|
||||
settableListenableFuture.set(null);
|
||||
assertNull(settableListenableFuture.get());
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getWaitsForCompletion() throws ExecutionException, InterruptedException {
|
||||
final String string = "hello";
|
||||
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
@@ -180,8 +208,11 @@ public class SettableListenableFutureTests {
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
||||
String value = settableListenableFuture.get();
|
||||
assertThat(value, equalTo(string));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -198,6 +229,7 @@ public class SettableListenableFutureTests {
|
||||
@Test
|
||||
public void getWithTimeoutWaitsForCompletion() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
final String string = "hello";
|
||||
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
@@ -210,65 +242,74 @@ public class SettableListenableFutureTests {
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
String value = settableListenableFuture.get(100L, TimeUnit.MILLISECONDS);
|
||||
|
||||
String value = settableListenableFuture.get(500L, TimeUnit.MILLISECONDS);
|
||||
assertThat(value, equalTo(string));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelPreventsValueFromBeingSet() {
|
||||
boolean wasCancelled = settableListenableFuture.cancel(true);
|
||||
assertTrue(wasCancelled);
|
||||
boolean wasSet = settableListenableFuture.set("hello");
|
||||
assertFalse(wasSet);
|
||||
assertTrue(settableListenableFuture.cancel(true));
|
||||
assertFalse(settableListenableFuture.set("hello"));
|
||||
assertTrue(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelSetsFutureToDone() {
|
||||
settableListenableFuture.cancel(true);
|
||||
assertTrue(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelWithMayInterruptIfRunningTrueCallsOverridenMethod() {
|
||||
public void cancelWithMayInterruptIfRunningTrueCallsOverriddenMethod() {
|
||||
InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture();
|
||||
tested.cancel(true);
|
||||
assertTrue(tested.cancel(true));
|
||||
assertTrue(tested.calledInterruptTask());
|
||||
assertTrue(tested.isCancelled());
|
||||
assertTrue(tested.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverridenMethod() {
|
||||
public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverriddenMethod() {
|
||||
InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture();
|
||||
tested.cancel(false);
|
||||
assertTrue(tested.cancel(false));
|
||||
assertFalse(tested.calledInterruptTask());
|
||||
assertTrue(tested.isCancelled());
|
||||
assertTrue(tested.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setPreventsCancel() {
|
||||
boolean wasSet = settableListenableFuture.set("hello");
|
||||
assertTrue(wasSet);
|
||||
boolean wasCancelled = settableListenableFuture.cancel(true);
|
||||
assertFalse(wasCancelled);
|
||||
assertTrue(settableListenableFuture.set("hello"));
|
||||
assertFalse(settableListenableFuture.cancel(true));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelPreventsExceptionFromBeingSet() {
|
||||
boolean wasCancelled = settableListenableFuture.cancel(true);
|
||||
assertTrue(wasCancelled);
|
||||
boolean wasSet = settableListenableFuture.setException(new RuntimeException());
|
||||
assertFalse(wasSet);
|
||||
assertTrue(settableListenableFuture.cancel(true));
|
||||
assertFalse(settableListenableFuture.setException(new RuntimeException()));
|
||||
assertTrue(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setExceptionPreventsCancel() {
|
||||
boolean wasSet = settableListenableFuture.setException(new RuntimeException());
|
||||
assertTrue(wasSet);
|
||||
boolean wasCancelled = settableListenableFuture.cancel(true);
|
||||
assertFalse(wasCancelled);
|
||||
assertTrue(settableListenableFuture.setException(new RuntimeException()));
|
||||
assertFalse(settableListenableFuture.cancel(true));
|
||||
assertFalse(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelStateThrowsExceptionWhenCallingGet() throws ExecutionException, InterruptedException {
|
||||
settableListenableFuture.cancel(true);
|
||||
|
||||
try {
|
||||
settableListenableFuture.get();
|
||||
fail("Expected CancellationException");
|
||||
@@ -276,6 +317,9 @@ public class SettableListenableFutureTests {
|
||||
catch (CancellationException ex) {
|
||||
// expected
|
||||
}
|
||||
|
||||
assertTrue(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -292,13 +336,17 @@ public class SettableListenableFutureTests {
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
||||
try {
|
||||
settableListenableFuture.get(100L, TimeUnit.MILLISECONDS);
|
||||
settableListenableFuture.get(500L, TimeUnit.MILLISECONDS);
|
||||
fail("Expected CancellationException");
|
||||
}
|
||||
catch (CancellationException ex) {
|
||||
// expected
|
||||
}
|
||||
|
||||
assertTrue(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -312,6 +360,9 @@ public class SettableListenableFutureTests {
|
||||
|
||||
settableListenableFuture.set("hello");
|
||||
verifyNoMoreInteractions(callback);
|
||||
|
||||
assertTrue(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -325,6 +376,9 @@ public class SettableListenableFutureTests {
|
||||
|
||||
settableListenableFuture.setException(new RuntimeException());
|
||||
verifyNoMoreInteractions(callback);
|
||||
|
||||
assertTrue(settableListenableFuture.isCancelled());
|
||||
assertTrue(settableListenableFuture.isDone());
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user