From fb11f314fc190d5bc406bb4d07fb746f2787d11f Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 2 Jan 2023 11:09:38 +0100 Subject: [PATCH] Add assertions to fail early on absent values using `StreamMessageListenerContainer`. Closes #2472 --- .../stream/DefaultStreamMessageListenerContainer.java | 4 ++++ .../data/redis/stream/StreamMessageListenerContainer.java | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java index a38ddafc6..352036726 100644 --- a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java @@ -183,6 +183,10 @@ class DefaultStreamMessageListenerContainer> implement @Override public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { + + Assert.notNull(streamRequest, "StreamReadRequest must not be null"); + Assert.notNull(listener, "StreamListener must not be null"); + return doRegister(getReadTask(streamRequest, listener)); } diff --git a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java index 76f5ab486..14a66cf5e 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java @@ -337,6 +337,9 @@ public interface StreamMessageListenerContainer> exten Predicate cancelSubscriptionOnError = t -> true; StreamReadRequestBuilder(StreamOffset streamOffset) { + + Assert.notNull(streamOffset, "StreamOffset must not be null"); + this.streamOffset = streamOffset; } @@ -355,6 +358,8 @@ public interface StreamMessageListenerContainer> exten */ public StreamReadRequestBuilder errorHandler(ErrorHandler errorHandler) { + Assert.notNull(errorHandler, "ErrorHandler must not be null"); + this.errorHandler = errorHandler; return this; } @@ -368,6 +373,7 @@ public interface StreamMessageListenerContainer> exten */ public StreamReadRequestBuilder cancelOnError(Predicate cancelSubscriptionOnError) { + Assert.notNull(cancelSubscriptionOnError, "cancelSubscriptionOnError Predicate must not be null"); this.cancelSubscriptionOnError = cancelSubscriptionOnError; return this; } @@ -440,6 +446,8 @@ public interface StreamMessageListenerContainer> exten */ public ConsumerStreamReadRequestBuilder consumer(Consumer consumer) { + Assert.notNull(consumer, "Consumer must not be null"); + this.consumer = consumer; return this; }