diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 8ba39b08e..f6e5687cf 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -403,7 +403,7 @@ public interface ReactiveStreamCommands { * @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}. * @return {@link Mono} the {@link RecordId id}. * @see Redis Documentation: XADD - * @since 3.3 + * @since 3.4 */ default Mono xAdd(ByteBufferRecord record, XAddOptions xAddOptions) { diff --git a/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java b/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java index b589c9916..bbba7bf01 100644 --- a/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/BoundStreamOperations.java @@ -20,6 +20,7 @@ import java.util.Map; import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; @@ -58,6 +59,18 @@ public interface BoundStreamOperations { @Nullable RecordId add(Map body); + /** + * Append a record to the stream {@code key} with the specified options. + * + * @param content record content as Map. + * @param xAddOptions additional parameters for the {@literal XADD} call. + * @return the record Id. {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XADD + * @since 3.4 + */ + @Nullable + RecordId add(Map content, XAddOptions xAddOptions); + /** * Removes the specified entries from the stream. Returns the number of items deleted, that may be different from the * number of IDs passed in case certain IDs do not exist. diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java index e4b905030..65697faf2 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java @@ -96,21 +96,6 @@ public interface ReactiveStreamOperations extends HashMapperProvider< return acknowledge(record.getRequiredStream(), group, record.getId()); } - /** - * Append one or more records to the stream {@code key} with the specified options. - * - * @param key the stream key. - * @param bodyPublisher record body {@link Publisher}. - * @param xAddOptions parameters for the {@literal XADD} call. - * @return the record Ids. - * @see Redis Documentation: XADD - * @since 3.3 - */ - default Flux add (K key, Publisher> bodyPublisher, - XAddOptions xAddOptions) { - return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions)); - } - /** * Append a record to the stream {@code key} with the specified options. * @@ -119,7 +104,7 @@ public interface ReactiveStreamOperations extends HashMapperProvider< * @param xAddOptions parameters for the {@literal XADD} call. * @return the {@link Mono} emitting the {@link RecordId}. * @see Redis Documentation: XADD - * @since 3.3 + * @since 3.4 */ default Mono add(K key, Map content, XAddOptions xAddOptions) { return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions); @@ -132,7 +117,7 @@ public interface ReactiveStreamOperations extends HashMapperProvider< * @param xAddOptions parameters for the {@literal XADD} call. * @return the {@link Mono} emitting the {@link RecordId}. * @see Redis Documentation: XADD - * @since 3.3 + * @since 3.4 */ @SuppressWarnings("unchecked") default Mono add(MapRecord record, XAddOptions xAddOptions) { @@ -149,7 +134,7 @@ public interface ReactiveStreamOperations extends HashMapperProvider< * @see MapRecord * @see ObjectRecord * @see Redis Documentation: XADD - * @since 3.3 + * @since 3.4 */ Mono add(Record record, XAddOptions xAddOptions); diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index c38a524ff..fa30db9f7 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -105,7 +105,7 @@ public interface StreamOperations extends HashMapperProvider * @param xAddOptions additional parameters for the {@literal XADD} call. * @return the record Id. {@literal null} when used in pipeline / transaction. * @see Redis Documentation: XADD - * @since 3.3 + * @since 3.4 */ @SuppressWarnings("unchecked") @Nullable @@ -120,7 +120,7 @@ public interface StreamOperations extends HashMapperProvider * @param xAddOptions additional parameters for the {@literal XADD} call. * @return the record Id. {@literal null} when used in pipeline / transaction. * @see Redis Documentation: XADD - * @since 3.3 + * @since 3.4 */ @SuppressWarnings("unchecked") @Nullable @@ -138,7 +138,7 @@ public interface StreamOperations extends HashMapperProvider * @see MapRecord * @see ObjectRecord * @see Redis Documentation: XADD - * @since 3.3 + * @since 3.4 */ @SuppressWarnings("unchecked") @Nullable