From 2393cfea8a8f50561bacd091501940f8c7ed4245 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 29 May 2017 14:21:26 -0400 Subject: [PATCH] GH-319: Add Acknowledgment to Retry Context [1.x] Fixes #319 Add the `Acknowledgment` to the retry context if the listener type warrants it. Conflicts: spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java src/reference/asciidoc/kafka.adoc --- ...bstractRetryingMessageListenerAdapter.java | 11 +++ ...ngAcknowledgingMessageListenerAdapter.java | 5 +- .../RetryingMessageListenerAdapter.java | 4 +- .../RetryingMessageListenerAdapterTests.java | 77 +++++++++++++++++++ src/reference/asciidoc/kafka.adoc | 8 ++ 5 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapterTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java index ba4c1264..d7b2b3fa 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java @@ -36,6 +36,17 @@ public class AbstractRetryingMessageListenerAdapter { protected final Log logger = LogFactory.getLog(this.getClass()); + /** + * {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment + * if the listener is capable of acknowledging. + */ + public static final String CONTEXT_ACKNOWLEDGMENT = "acknowledgment"; + + /** + * {@link org.springframework.retry.RetryContext} attribute key for the record. + */ + public static final String CONTEXT_RECORD = "record"; + private final RetryTemplate retryTemplate; private final RecoveryCallback recoveryCallback; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingAcknowledgingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingAcknowledgingMessageListenerAdapter.java index 3fb2b389..640660ef 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingAcknowledgingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingAcknowledgingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,7 +72,8 @@ public class RetryingAcknowledgingMessageListenerAdapter extends AbstractR @Override public Void doWithRetry(RetryContext context) throws KafkaException { - context.setAttribute("record", record); + context.setAttribute(CONTEXT_RECORD, record); + context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment); RetryingAcknowledgingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment); return null; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java index 3a498fec..98e86fd0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,7 +70,7 @@ public class RetryingMessageListenerAdapter extends AbstractRetryingMessag @Override public Void doWithRetry(RetryContext context) throws KafkaException { - context.setAttribute("record", record); + context.setAttribute(CONTEXT_RECORD, record); RetryingMessageListenerAdapter.this.delegate.onMessage(record); return null; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapterTests.java new file mode 100644 index 00000000..ce686320 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapterTests.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Test; + +import org.springframework.kafka.listener.AcknowledgingMessageListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.retry.RetryContext; +import org.springframework.retry.support.RetryTemplate; + +/** + * @author Gary Russell + * @since 1.0.7 + * + */ +public class RetryingMessageListenerAdapterTests { + + @Test + public void testRecoveryCallbackSimple() { + final AtomicReference context = new AtomicReference<>(); + RetryingMessageListenerAdapter adapter = new RetryingMessageListenerAdapter<>( + r -> { + throw new RuntimeException(); + }, new RetryTemplate(), c -> { + context.set(c); + return null; + }); + @SuppressWarnings("unchecked") + ConsumerRecord record = mock(ConsumerRecord.class); + adapter.onMessage(record); + assertThat(context.get()).isNotNull(); + assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).isNull(); + assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)).isSameAs(record); + } + + @Test + public void testRecoveryCallbackAckOnly() { + final AtomicReference context = new AtomicReference<>(); + RetryingAcknowledgingMessageListenerAdapter adapter = + new RetryingAcknowledgingMessageListenerAdapter<>( + (AcknowledgingMessageListener) (r, a) -> { + throw new RuntimeException(); + }, new RetryTemplate(), c -> { + context.set(c); + return null; + }); + @SuppressWarnings("unchecked") + ConsumerRecord record = mock(ConsumerRecord.class); + Acknowledgment ack = mock(Acknowledgment.class); + adapter.onMessage(record, ack); + assertThat(context.get()).isNotNull(); + assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).isSameAs(ack); + assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)).isSameAs(record); + } + +} diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 0039f89f..ec9f4e2c 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -367,6 +367,14 @@ In that case, the `ErrorHandler` will be invoked, if configured, or logged other When using `@KafkaListener`, set the `RetryTemplate` (and optionally `recoveryCallback`) on the container factory and the listener will be wrapped in the appropriate retrying adapter. +The contents of the `RetryContext` passed into the `RecoveryCallback` will depend on the type of listener. +The context will always have an attribute `record` which is the record for which the failure occurred. +If your listener is acknowledging the additional `acknowledgment` attribute is provided. +For convenience, the `AbstractRetryingMessageListenerAdapter` provides static constants for these keys. +See its javadocs for more information. + +A retry adapter is not provided for any of the batch <>. + ==== Serialization/Deserialization and Message Conversion Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys.