GH-319: Add Acknowledgment to Retry Context [1.x]
Fixes #319 Add the `Acknowledgment` to the retry context if the listener type warrants it. Conflicts: spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java src/reference/asciidoc/kafka.adoc
This commit is contained in:
committed by
Artem Bilan
parent
91da0b7b28
commit
2393cfea8a
@@ -36,6 +36,17 @@ public class AbstractRetryingMessageListenerAdapter<K, V> {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(this.getClass());
|
||||
|
||||
/**
|
||||
* {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment
|
||||
* if the listener is capable of acknowledging.
|
||||
*/
|
||||
public static final String CONTEXT_ACKNOWLEDGMENT = "acknowledgment";
|
||||
|
||||
/**
|
||||
* {@link org.springframework.retry.RetryContext} attribute key for the record.
|
||||
*/
|
||||
public static final String CONTEXT_RECORD = "record";
|
||||
|
||||
private final RetryTemplate retryTemplate;
|
||||
|
||||
private final RecoveryCallback<Void> recoveryCallback;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -72,7 +72,8 @@ public class RetryingAcknowledgingMessageListenerAdapter<K, V> extends AbstractR
|
||||
|
||||
@Override
|
||||
public Void doWithRetry(RetryContext context) throws KafkaException {
|
||||
context.setAttribute("record", record);
|
||||
context.setAttribute(CONTEXT_RECORD, record);
|
||||
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
|
||||
RetryingAcknowledgingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -70,7 +70,7 @@ public class RetryingMessageListenerAdapter<K, V> extends AbstractRetryingMessag
|
||||
|
||||
@Override
|
||||
public Void doWithRetry(RetryContext context) throws KafkaException {
|
||||
context.setAttribute("record", record);
|
||||
context.setAttribute(CONTEXT_RECORD, record);
|
||||
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.kafka.listener.adapter;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.kafka.listener.AcknowledgingMessageListener;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 1.0.7
|
||||
*
|
||||
*/
|
||||
public class RetryingMessageListenerAdapterTests {
|
||||
|
||||
@Test
|
||||
public void testRecoveryCallbackSimple() {
|
||||
final AtomicReference<RetryContext> context = new AtomicReference<>();
|
||||
RetryingMessageListenerAdapter<String, String> adapter = new RetryingMessageListenerAdapter<>(
|
||||
r -> {
|
||||
throw new RuntimeException();
|
||||
}, new RetryTemplate(), c -> {
|
||||
context.set(c);
|
||||
return null;
|
||||
});
|
||||
@SuppressWarnings("unchecked")
|
||||
ConsumerRecord<String, String> record = mock(ConsumerRecord.class);
|
||||
adapter.onMessage(record);
|
||||
assertThat(context.get()).isNotNull();
|
||||
assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).isNull();
|
||||
assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)).isSameAs(record);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryCallbackAckOnly() {
|
||||
final AtomicReference<RetryContext> context = new AtomicReference<>();
|
||||
RetryingAcknowledgingMessageListenerAdapter<String, String> adapter =
|
||||
new RetryingAcknowledgingMessageListenerAdapter<>(
|
||||
(AcknowledgingMessageListener<String, String>) (r, a) -> {
|
||||
throw new RuntimeException();
|
||||
}, new RetryTemplate(), c -> {
|
||||
context.set(c);
|
||||
return null;
|
||||
});
|
||||
@SuppressWarnings("unchecked")
|
||||
ConsumerRecord<String, String> record = mock(ConsumerRecord.class);
|
||||
Acknowledgment ack = mock(Acknowledgment.class);
|
||||
adapter.onMessage(record, ack);
|
||||
assertThat(context.get()).isNotNull();
|
||||
assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).isSameAs(ack);
|
||||
assertThat(context.get().getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)).isSameAs(record);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -367,6 +367,14 @@ In that case, the `ErrorHandler` will be invoked, if configured, or logged other
|
||||
|
||||
When using `@KafkaListener`, set the `RetryTemplate` (and optionally `recoveryCallback`) on the container factory and the listener will be wrapped in the appropriate retrying adapter.
|
||||
|
||||
The contents of the `RetryContext` passed into the `RecoveryCallback` will depend on the type of listener.
|
||||
The context will always have an attribute `record` which is the record for which the failure occurred.
|
||||
If your listener is acknowledging the additional `acknowledgment` attribute is provided.
|
||||
For convenience, the `AbstractRetryingMessageListenerAdapter` provides static constants for these keys.
|
||||
See its javadocs for more information.
|
||||
|
||||
A retry adapter is not provided for any of the batch <<message-listeners, message listeners>>.
|
||||
|
||||
==== Serialization/Deserialization and Message Conversion
|
||||
|
||||
Apache Kafka provides a high-level API for serializing/deserializing record values as well as their keys.
|
||||
|
||||
Reference in New Issue
Block a user