Polishing.
Add new method to BoundStreamOperations and remove add accepting a Publisher. Original Pull Request: #2936
This commit is contained in:
@@ -403,7 +403,7 @@ public interface ReactiveStreamCommands {
|
||||
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
|
||||
* @return {@link Mono} the {@link RecordId id}.
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
* @since 3.4
|
||||
*/
|
||||
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ import java.util.Map;
|
||||
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.Limit;
|
||||
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.MapRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
@@ -58,6 +59,18 @@ public interface BoundStreamOperations<K, HK, HV> {
|
||||
@Nullable
|
||||
RecordId add(Map<HK, HV> body);
|
||||
|
||||
/**
|
||||
* Append a record to the stream {@code key} with the specified options.
|
||||
*
|
||||
* @param content record content as Map.
|
||||
* @param xAddOptions additional parameters for the {@literal XADD} call.
|
||||
* @return the record Id. {@literal null} when used in pipeline / transaction.
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.4
|
||||
*/
|
||||
@Nullable
|
||||
RecordId add(Map<HK, HV> content, XAddOptions xAddOptions);
|
||||
|
||||
/**
|
||||
* Removes the specified entries from the stream. Returns the number of items deleted, that may be different from the
|
||||
* number of IDs passed in case certain IDs do not exist.
|
||||
|
||||
@@ -96,21 +96,6 @@ public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<
|
||||
return acknowledge(record.getRequiredStream(), group, record.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Append one or more records to the stream {@code key} with the specified options.
|
||||
*
|
||||
* @param key the stream key.
|
||||
* @param bodyPublisher record body {@link Publisher}.
|
||||
* @param xAddOptions parameters for the {@literal XADD} call.
|
||||
* @return the record Ids.
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
*/
|
||||
default Flux<RecordId> add (K key, Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher,
|
||||
XAddOptions xAddOptions) {
|
||||
return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions));
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a record to the stream {@code key} with the specified options.
|
||||
*
|
||||
@@ -119,7 +104,7 @@ public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<
|
||||
* @param xAddOptions parameters for the {@literal XADD} call.
|
||||
* @return the {@link Mono} emitting the {@link RecordId}.
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
* @since 3.4
|
||||
*/
|
||||
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
|
||||
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
|
||||
@@ -132,7 +117,7 @@ public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<
|
||||
* @param xAddOptions parameters for the {@literal XADD} call.
|
||||
* @return the {@link Mono} emitting the {@link RecordId}.
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
* @since 3.4
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
|
||||
@@ -149,7 +134,7 @@ public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<
|
||||
* @see MapRecord
|
||||
* @see ObjectRecord
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
* @since 3.4
|
||||
*/
|
||||
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV>
|
||||
* @param xAddOptions additional parameters for the {@literal XADD} call.
|
||||
* @return the record Id. {@literal null} when used in pipeline / transaction.
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
* @since 3.4
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
@@ -120,7 +120,7 @@ public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV>
|
||||
* @param xAddOptions additional parameters for the {@literal XADD} call.
|
||||
* @return the record Id. {@literal null} when used in pipeline / transaction.
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
* @since 3.4
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
@@ -138,7 +138,7 @@ public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV>
|
||||
* @see MapRecord
|
||||
* @see ObjectRecord
|
||||
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
|
||||
* @since 3.3
|
||||
* @since 3.4
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
|
||||
Reference in New Issue
Block a user