Add assertions to fail early on absent values using StreamMessageListenerContainer.

Closes #2472
This commit is contained in:
Mark Paluch
2023-01-02 11:09:38 +01:00
parent d9b6d4d395
commit fb11f314fc
2 changed files with 12 additions and 0 deletions

View File

@@ -183,6 +183,10 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
@Override
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
Assert.notNull(streamRequest, "StreamReadRequest must not be null");
Assert.notNull(listener, "StreamListener must not be null");
return doRegister(getReadTask(streamRequest, listener));
}

View File

@@ -337,6 +337,9 @@ public interface StreamMessageListenerContainer<K, V extends Record<K, ?>> exten
Predicate<Throwable> cancelSubscriptionOnError = t -> true;
StreamReadRequestBuilder(StreamOffset<K> streamOffset) {
Assert.notNull(streamOffset, "StreamOffset must not be null");
this.streamOffset = streamOffset;
}
@@ -355,6 +358,8 @@ public interface StreamMessageListenerContainer<K, V extends Record<K, ?>> exten
*/
public StreamReadRequestBuilder<K> errorHandler(ErrorHandler errorHandler) {
Assert.notNull(errorHandler, "ErrorHandler must not be null");
this.errorHandler = errorHandler;
return this;
}
@@ -368,6 +373,7 @@ public interface StreamMessageListenerContainer<K, V extends Record<K, ?>> exten
*/
public StreamReadRequestBuilder<K> cancelOnError(Predicate<Throwable> cancelSubscriptionOnError) {
Assert.notNull(cancelSubscriptionOnError, "cancelSubscriptionOnError Predicate must not be null");
this.cancelSubscriptionOnError = cancelSubscriptionOnError;
return this;
}
@@ -440,6 +446,8 @@ public interface StreamMessageListenerContainer<K, V extends Record<K, ?>> exten
*/
public ConsumerStreamReadRequestBuilder<K> consumer(Consumer consumer) {
Assert.notNull(consumer, "Consumer must not be null");
this.consumer = consumer;
return this;
}