GH-3542: API for adding record interceptors instead of overriding them (#3937)
Fixes: https://github.com/spring-projects/spring-kafka/issues/3542 * Providing the ability to add record interceptors instead of overriding them Change `RecordInterceptor` to `List<RecordInterceptor>` in `MessageListenerContainer` will allow the addition of multiple `RecordInterceptor` instances instead of overriding the existing one Currently, only a single `RecordInterceptor` is supported. Users may want to register multiple `RecordInterceptors`. There are some workarounds, but they are not clean or ideal solutions. By supporting `List<RecordInterceptor`>, users can add their own interceptors via `setRecordInterceptor(...)`. * Adding new API for addRecordInterceptor. * Addressing PR review Signed-off-by: Sanghyeok An <ojt90902@naver.com>
This commit is contained in:
committed by
GitHub
parent
cadd306014
commit
38dba2f9b6
@@ -22,6 +22,35 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t
|
||||
|
||||
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
|
||||
|
||||
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method.
|
||||
If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured.
|
||||
The following example shows how to do so:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
|
||||
CompositeRecordInterceptor compositeInterceptor;
|
||||
|
||||
RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
|
||||
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
|
||||
compositeInterceptor = interceptor;
|
||||
} else {
|
||||
compositeInterceptor = new CompositeRecordInterceptor<>();
|
||||
container.setRecordInterceptor(compositeInterceptor);
|
||||
}
|
||||
|
||||
if (previousInterceptor != null) {
|
||||
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
|
||||
}
|
||||
|
||||
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
|
||||
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};
|
||||
|
||||
compositeInterceptor.addRecordInterceptor(recordInterceptor1);
|
||||
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
|
||||
}
|
||||
----
|
||||
|
||||
By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
|
||||
You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
|
||||
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++.
|
||||
@@ -265,4 +294,3 @@ The listener containers implement `SmartLifecycle`, and `autoStartup` is `true`
|
||||
The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
|
||||
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
|
||||
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.
|
||||
|
||||
|
||||
@@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba
|
||||
|
||||
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
|
||||
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].
|
||||
|
||||
[[x40-add-record-interceptor]]
|
||||
=== Configure additional `RecordInterceptor`
|
||||
|
||||
Listener containers now support interceptor customization via `getRecordInterceptor()`.
|
||||
See the xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers] section for details.
|
||||
|
||||
@@ -460,7 +460,12 @@ public abstract class AbstractMessageListenerContainer<K, V>
|
||||
this.kafkaAdmin = kafkaAdmin;
|
||||
}
|
||||
|
||||
protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
|
||||
/**
|
||||
* Get the {@link RecordInterceptor} for modification, if configured.
|
||||
* @return the {@link RecordInterceptor}, or {@code null} if not configured
|
||||
* @since 4.0
|
||||
*/
|
||||
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
|
||||
return this.recordInterceptor;
|
||||
}
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ import org.springframework.util.Assert;
|
||||
*
|
||||
* @author Artem Bilan
|
||||
* @author Gary Russell
|
||||
* @author Sanghyeok An
|
||||
* @since 2.3
|
||||
*
|
||||
*/
|
||||
@@ -92,4 +93,13 @@ public class CompositeRecordInterceptor<K, V> implements RecordInterceptor<K, V>
|
||||
this.delegates.forEach(del -> del.afterRecord(record, consumer));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an {@link RecordInterceptor} to delegates.
|
||||
* @param recordInterceptor the interceptor.
|
||||
* @since 4.0
|
||||
*/
|
||||
public void addRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
|
||||
this.delegates.add(recordInterceptor);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -3842,7 +3842,7 @@ public class KafkaMessageListenerContainerTests {
|
||||
containerProps.setClientId("clientId");
|
||||
|
||||
CountDownLatch afterLatch = new CountDownLatch(1);
|
||||
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
|
||||
RecordInterceptor<Integer, String> recordInterceptor1 = spy(new RecordInterceptor<Integer, String>() {
|
||||
|
||||
@Override
|
||||
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
|
||||
@@ -3858,25 +3858,54 @@ public class KafkaMessageListenerContainerTests {
|
||||
|
||||
});
|
||||
|
||||
RecordInterceptor<Integer, String> recordInterceptor2 = spy(new RecordInterceptor<Integer, String>() {
|
||||
|
||||
@Override
|
||||
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
|
||||
Consumer<Integer, String> consumer) {
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearThreadState(Consumer<?, ?> consumer) {
|
||||
afterLatch.countDown();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
KafkaMessageListenerContainer<Integer, String> container =
|
||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
||||
container.setRecordInterceptor(recordInterceptor);
|
||||
container.setRecordInterceptor(new CompositeRecordInterceptor<>());
|
||||
if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor<Integer, String> composite) {
|
||||
composite.addRecordInterceptor(recordInterceptor1);
|
||||
composite.addRecordInterceptor(recordInterceptor2);
|
||||
}
|
||||
|
||||
container.start();
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(afterLatch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
|
||||
InOrder inOrder = inOrder(recordInterceptor, messageListener, consumer);
|
||||
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
|
||||
InOrder inOrder = inOrder(recordInterceptor1, recordInterceptor2, messageListener, consumer);
|
||||
inOrder.verify(recordInterceptor1).setupThreadState(eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).setupThreadState(eq(consumer));
|
||||
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
|
||||
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor1).intercept(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).intercept(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(messageListener).onMessage(eq(firstRecord));
|
||||
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor1).success(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).success(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor1).afterRecord(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).afterRecord(eq(firstRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor1).intercept(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).intercept(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(messageListener).onMessage(eq(secondRecord));
|
||||
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor).clearThreadState(eq(consumer));
|
||||
inOrder.verify(recordInterceptor1).success(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).success(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor1).afterRecord(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).afterRecord(eq(secondRecord), eq(consumer));
|
||||
inOrder.verify(recordInterceptor1).clearThreadState(eq(consumer));
|
||||
inOrder.verify(recordInterceptor2).clearThreadState(eq(consumer));
|
||||
container.stop();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user