diff --git a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java index a38ddafc6..352036726 100644 --- a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java @@ -183,6 +183,10 @@ class DefaultStreamMessageListenerContainer> implement @Override public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { + + Assert.notNull(streamRequest, "StreamReadRequest must not be null"); + Assert.notNull(listener, "StreamListener must not be null"); + return doRegister(getReadTask(streamRequest, listener)); } diff --git a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java index 76f5ab486..14a66cf5e 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java @@ -337,6 +337,9 @@ public interface StreamMessageListenerContainer> exten Predicate cancelSubscriptionOnError = t -> true; StreamReadRequestBuilder(StreamOffset streamOffset) { + + Assert.notNull(streamOffset, "StreamOffset must not be null"); + this.streamOffset = streamOffset; } @@ -355,6 +358,8 @@ public interface StreamMessageListenerContainer> exten */ public StreamReadRequestBuilder errorHandler(ErrorHandler errorHandler) { + Assert.notNull(errorHandler, "ErrorHandler must not be null"); + this.errorHandler = errorHandler; return this; } @@ -368,6 +373,7 @@ public interface StreamMessageListenerContainer> exten */ public StreamReadRequestBuilder cancelOnError(Predicate cancelSubscriptionOnError) { + Assert.notNull(cancelSubscriptionOnError, "cancelSubscriptionOnError Predicate must not be null"); this.cancelSubscriptionOnError = cancelSubscriptionOnError; return this; } @@ -440,6 +446,8 @@ public interface StreamMessageListenerContainer> exten */ public ConsumerStreamReadRequestBuilder consumer(Consumer consumer) { + Assert.notNull(consumer, "Consumer must not be null"); + this.consumer = consumer; return this; }