From 3dae3fd8a9509b9946ca8e5452a9d0d934bc845b Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 14 Jan 2016 17:49:01 -0500 Subject: [PATCH] Refine ListenableFutureCallback policy for exceptions This change updates all cases where callbacks are invoked to catch and suppress errors (since there is not match to do with and error from a callback be it success or failure). Also updated is the contract itself to clarify this and emphasize the callbacks are really notifications for the outcome of the ListenableFuture not the callbacks themselves. Issue: SPR-13785 --- .../scheduling/annotation/AsyncResult.java | 11 +++++- .../util/concurrent/FailureCallback.java | 10 ++--- .../util/concurrent/ListenableFuture.java | 23 +++++------ .../concurrent/ListenableFutureAdapter.java | 18 ++++++++- .../concurrent/ListenableFutureCallback.java | 4 +- .../ListenableFutureCallbackRegistry.java | 38 ++++++++++++++----- .../util/concurrent/SuccessCallback.java | 8 ++-- ...tractPromiseToListenableFutureAdapter.java | 5 ++- 8 files changed, 78 insertions(+), 39 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java index ae6ecc4271..48edaf62da 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java @@ -37,6 +37,7 @@ import org.springframework.util.concurrent.SuccessCallback; * to the caller. * * @author Juergen Hoeller + * @author Rossen Stoyanchev * @since 3.0 * @see Async * @see #forValue(Object) @@ -103,10 +104,16 @@ public class AsyncResult implements ListenableFuture { @Override public void addCallback(SuccessCallback successCallback, FailureCallback failureCallback) { try { - successCallback.onSuccess(this.value); + if (this.executionException != null) { + Throwable cause = this.executionException.getCause(); + failureCallback.onFailure(cause != null ? cause : this.executionException); + } + else { + successCallback.onSuccess(this.value); + } } catch (Throwable ex) { - failureCallback.onFailure(ex); + // Ignore } } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java index caac2874b5..da55327e83 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/FailureCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,7 @@ package org.springframework.util.concurrent; /** - * Defines the contract for failure callbacks that accept the result of a - * {@link ListenableFuture}. + * Failure callback for a {@link ListenableFuture}. * * @author Sebastien Deleuze * @since 4.1 @@ -26,8 +25,9 @@ package org.springframework.util.concurrent; public interface FailureCallback { /** - * Called when the {@link ListenableFuture} fails to complete. - * @param ex the exception that triggered the failure + * Called when the {@link ListenableFuture} completes with failure. + *

Note that Exceptions raised by this method are ignored. + * @param ex the failure */ void onFailure(Throwable ex); diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java index 6250b08f58..cf84909df9 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,11 @@ package org.springframework.util.concurrent; import java.util.concurrent.Future; /** - * Extends the {@link Future} interface with the capability to accept completion - * callbacks. If the future has already completed when the callback is added, the - * callback will be triggered immediately. + * Extend {@link Future} with the capability to accept completion callbacks. + * If the future has completed when the callback is added, the callback is + * triggered immediately. *

Inspired by {@code com.google.common.util.concurrent.ListenableFuture}. - + * * @author Arjen Poutsma * @author Sebastien Deleuze * @since 4.0 @@ -31,20 +31,15 @@ import java.util.concurrent.Future; public interface ListenableFuture extends Future { /** - * Registers the given callback to this {@code ListenableFuture}. The callback will - * be triggered when this {@code Future} is complete or, if it is already complete, - * immediately. + * Register the given {@code ListenableFutureCallback}. * @param callback the callback to register */ void addCallback(ListenableFutureCallback callback); /** - * Registers the given success and failure callbacks to this {@code ListenableFuture}. - * The callback will be triggered when this {@code Future} is complete or, if it is - * already complete immediately. This is a Java 8 lambdas compliant alternative to - * {@link #addCallback(ListenableFutureCallback)}. - * @param successCallback the success callback to register - * @param failureCallback the failure callback to register + * Java 8 lambda-friendly alternative with success and failure callbacks. + * @param successCallback the success callback + * @param failureCallback the failure callback * @since 4.1 */ void addCallback(SuccessCallback successCallback, FailureCallback failureCallback); diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java index 268a64964d..8d09183264 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java @@ -52,20 +52,34 @@ public abstract class ListenableFutureAdapter extends FutureAdapter listenableAdaptee.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(S result) { + T adapted; try { - successCallback.onSuccess(adaptInternal(result)); + adapted = adaptInternal(result); } catch (ExecutionException ex) { Throwable cause = ex.getCause(); onFailure(cause != null ? cause : ex); + return; } catch (Throwable ex) { onFailure(ex); + return; + } + try { + successCallback.onSuccess(adapted); + } + catch (Throwable e) { + // Ignore } } @Override public void onFailure(Throwable ex) { - failureCallback.onFailure(ex); + try { + failureCallback.onFailure(ex); + } + catch (Throwable t) { + // Ignore + } } }); } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java index 633794458c..515ee4061c 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package org.springframework.util.concurrent; /** - * Defines the contract for callbacks that accept the result of a + * Callback mechanism for the outcome, success or failure, from a * {@link ListenableFuture}. * * @author Arjen Poutsma diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java index 1c4be730da..d9964598bd 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,12 +22,14 @@ import java.util.Queue; import org.springframework.util.Assert; /** - * Registry for {@link ListenableFutureCallback} instances. + * Helper class for {@link ListenableFuture} implementations that maintains a + * of success and failure callbacks and helps to notify them. * *

Inspired by {@code com.google.common.util.concurrent.ExecutionList}. * * @author Arjen Poutsma * @author Sebastien Deleuze + * @author Rossen Stoyanchev * @since 4.0 */ public class ListenableFutureCallbackRegistry { @@ -47,7 +49,6 @@ public class ListenableFutureCallbackRegistry { * Add the given callback to this registry. * @param callback the callback to add */ - @SuppressWarnings("unchecked") public void addCallback(ListenableFutureCallback callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { @@ -57,15 +58,34 @@ public class ListenableFutureCallbackRegistry { this.failureCallbacks.add(callback); break; case SUCCESS: - callback.onSuccess((T) this.result); + notifySuccess(callback); break; case FAILURE: - callback.onFailure((Throwable) this.result); + notifyFailure(callback); break; } } } + @SuppressWarnings("unchecked") + private void notifySuccess(SuccessCallback callback) { + try { + callback.onSuccess((T) this.result); + } + catch (Throwable ex) { + // Ignore + } + } + + private void notifyFailure(FailureCallback callback) { + try { + callback.onFailure((Throwable) this.result); + } + catch (Throwable ex) { + // Ignore + } + } + /** * Add the given success callback to this registry. * @param callback the success callback to add @@ -80,7 +100,7 @@ public class ListenableFutureCallbackRegistry { this.successCallbacks.add(callback); break; case SUCCESS: - callback.onSuccess((T) this.result); + notifySuccess(callback); break; } } @@ -99,7 +119,7 @@ public class ListenableFutureCallbackRegistry { this.failureCallbacks.add(callback); break; case FAILURE: - callback.onFailure((Throwable) this.result); + notifyFailure(callback); break; } } @@ -115,7 +135,7 @@ public class ListenableFutureCallbackRegistry { this.state = State.SUCCESS; this.result = result; while (!this.successCallbacks.isEmpty()) { - this.successCallbacks.poll().onSuccess(result); + notifySuccess(this.successCallbacks.poll()); } } } @@ -130,7 +150,7 @@ public class ListenableFutureCallbackRegistry { this.state = State.FAILURE; this.result = ex; while (!this.failureCallbacks.isEmpty()) { - this.failureCallbacks.poll().onFailure(ex); + notifyFailure(this.failureCallbacks.poll()); } } } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java b/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java index 65f3041162..269f804476 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SuccessCallback.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,7 @@ package org.springframework.util.concurrent; /** - * Defines the contract for success callbacks that accept the result of a - * {@link ListenableFuture}. + * Success callback for a {@link ListenableFuture}. * * @author Sebastien Deleuze * @since 4.1 @@ -26,7 +25,8 @@ package org.springframework.util.concurrent; public interface SuccessCallback { /** - * Called when the {@link ListenableFuture} successfully completes. + * Called when the {@link ListenableFuture} completes with success. + *

Note that Exceptions raised by this method are ignored. * @param result the result */ void onSuccess(T result); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java index d2d6bc4ebf..a96fba8feb 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java @@ -53,12 +53,15 @@ abstract class AbstractPromiseToListenableFutureAdapter implements Listena this.promise.onSuccess(new Consumer() { @Override public void accept(S result) { + T adapted; try { - registry.success(adapt(result)); + adapted = adapt(result); } catch (Throwable ex) { registry.failure(ex); + return; } + registry.success(adapted); } });