Add overloads for StreamOperations add with additional XAddOptions.

Closes: #2915
Original Pull Request: #2936
This commit is contained in:
jinkshower
2024-07-07 21:13:59 +09:00
committed by Christoph Strobl
parent bfa62bf31d
commit 486dc97623
7 changed files with 488 additions and 1 deletions

View File

@@ -32,6 +32,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.Command
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -58,6 +59,7 @@ import org.springframework.util.StringUtils;
* @author Tugdual Grall
* @author Dengliming
* @author Mark John Moreno
* @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamCommands {
@@ -394,11 +396,40 @@ public interface ReactiveStreamCommands {
return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput);
}
/**
* Add stream record with the specified options.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return {@link Mono} the {@link RecordId id}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
Assert.notNull(record, "Record must not be null");
Assert.notNull(xAddOptions, "XAddOptions must not be null");
AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
.approximateTrimming(xAddOptions.isApproximateTrimming())
.makeNoStream(xAddOptions.isNoMkStream());
if (xAddOptions.hasMaxlen()) {
addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
}
if (xAddOptions.hasMinId()) {
addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
}
return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
}
/**
* Add stream record with given {@literal body} to {@literal key}.
*
* @param commands must not be {@literal null}.
* @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands.
* @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
*/
Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands);

View File

@@ -33,6 +33,7 @@ import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -60,6 +61,7 @@ import org.springframework.util.ClassUtils;
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -146,6 +148,18 @@ class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperat
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
}
@Override
public Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions) {
Assert.notNull(record.getStream(), "Key must not be null");
Assert.notNull(record.getValue(), "Body must not be null");
Assert.notNull(xAddOptions, "XAddOptions must not be null");
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions));
}
@Override
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {

View File

@@ -26,6 +26,7 @@ import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -54,6 +55,7 @@ import org.springframework.util.ClassUtils;
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -136,6 +138,21 @@ class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> i
return execute(connection -> connection.xAdd(binaryRecord));
}
@Nullable
@Override
@SuppressWarnings("unchecked")
public RecordId add(Record<K , ?> record, XAddOptions options) {
Assert.notNull(record, "Record must not be null");
Assert.notNull(options, "XAddOptions must not be null");
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);
ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());
return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options));
}
@Override
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {

View File

@@ -26,6 +26,7 @@ import org.reactivestreams.Publisher;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -54,6 +55,7 @@ import org.springframework.util.Assert;
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -94,6 +96,63 @@ public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<
return acknowledge(record.getRequiredStream(), group, record.getId());
}
/**
* Append one or more records to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param bodyPublisher record body {@link Publisher}.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the record Ids.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Flux<RecordId> add (K key, Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher,
XAddOptions xAddOptions) {
return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions));
}
/**
* Append a record to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param content record content as Map.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
}
/**
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
*
* @param record the record to append.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
return add((Record) record, xAddOptions);
}
/**
* Append the record, backed by the given value, to the stream with the specified options.
* The value will be hashed and serialized.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);
/**
* Append one or more records to the stream {@code key}.
*

View File

@@ -25,6 +25,7 @@ import java.util.Map;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -53,6 +54,7 @@ import org.springframework.util.Assert;
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -95,6 +97,53 @@ public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV>
return acknowledge(record.getRequiredStream(), group, record.getId());
}
/**
* Append a record to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param content record content as Map.
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
default RecordId add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
}
/**
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
*
* @param record the record to append.
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
return add((Record) record, xAddOptions);
}
/**
* Append the record, backed by the given value, to the stream with the specified options.
* The value will be hashed and serialized.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
RecordId add(Record<K, ?> record, XAddOptions xAddOptions);
/**
* Append a record to the stream {@code key}.
*